Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
test_fixtures.py
1 import subprocess
2 import logging
3 import json
4 import tempfile
5 import threading
6 import time
7 import os
8 import shutil
9 import signal
10 import socket
11 
12 from os import devnull
13 from os import path
14 from uuid import uuid4
15 from os import urandom
16 from functools import wraps
17 from http.server import SimpleHTTPRequestHandler, HTTPServer
18 
19 from fake_http_server.fake_test_server import FakeTestServerBackground
20 from sota_tools.treehub_server import create_repo
21 
22 
23 logger = logging.getLogger(__name__)
24 
25 
26 class Aktualizr:
27 
28  def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29  uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30  secondary=None, output_logs=True,
31  run_mode='once', director=None, image_repo=None,
32  sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
33  self.id = id
34  self._aktualizr_primary_exe = aktualizr_primary_exe
35  self._aktualizr_info_exe = aktualizr_info_exe
36  self._storage_dir = tempfile.TemporaryDirectory()
37  self._log_level = log_level
38  self._sentinel_file = 'need_reboot'
39  self.reboot_sentinel_file = os.path.join(self._storage_dir.name, self._sentinel_file)
40  self._import_dir = os.path.join(self._storage_dir.name, 'import')
41  KeyStore.copy_keys(self._import_dir)
42 
43  with open(path.join(self._storage_dir.name, 'secondary_config.json'), 'w+') as secondary_config_file:
44  secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45  format(port=secondary.primary_port if secondary else wait_port,
46  timeout=wait_timeout))
47  json.dump(secondary_cfg, secondary_config_file)
48  self._secondary_config_file = secondary_config_file.name
49 
50  with open(path.join(self._storage_dir.name, 'config.toml'), 'w+') as config_file:
51  config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
52  import_path=self._import_dir,
53  serial=id[1], hw_ID=id[0],
54  storage_dir=self._storage_dir.name,
55  db_path=path.join(self._storage_dir.name, 'sql.db'),
56  log_level=self._log_level,
57  secondary_cfg_file=self._secondary_config_file,
58  director=director.base_url if director else '',
59  image_repo=image_repo.base_url if image_repo else '',
60  pacman_type='ostree' if treehub and sysroot else 'fake',
61  ostree_sysroot=sysroot.path if sysroot else '',
62  treehub_server=treehub.base_url if treehub else '',
63  sentinel_dir=self._storage_dir.name,
64  sentinel_name=self._sentinel_file))
65  self._config_file = config_file.name
66 
67  self.add_secondary(secondary) if secondary else None
68  self._output_logs = output_logs
69  self._run_mode = run_mode
70  self._run_env = {}
71  if sysroot and ostree_mock_path:
72  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
73  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
74 
75  CONFIG_TEMPLATE = '''
76  [tls]
77  server = "{server_url}"
78 
79  [import]
80  base_path = "{import_path}"
81  tls_cacert_path = "ca.pem"
82  tls_pkey_path = "pkey.pem"
83  tls_clientcert_path = "client.pem"
84 
85  [provision]
86  primary_ecu_serial = "{serial}"
87  primary_ecu_hardware_id = "{hw_ID}"
88 
89  [storage]
90  path = "{storage_dir}"
91  type = "sqlite"
92  sqldb_path = "{db_path}"
93 
94  [pacman]
95  type = "{pacman_type}"
96  sysroot = "{ostree_sysroot}"
97  ostree_server = "{treehub_server}"
98  os = "dummy-os"
99 
100  [uptane]
101  polling_sec = 0
102  secondary_config_file = "{secondary_cfg_file}"
103  director_server = "{director}"
104  repo_server = "{image_repo}"
105 
106  [bootloader]
107  reboot_sentinel_dir = "{sentinel_dir}"
108  reboot_sentinel_name = "{sentinel_name}"
109  reboot_command = ""
110 
111  [logger]
112  loglevel = {log_level}
113 
114  '''
115 
116  SECONDARY_CONFIG_TEMPLATE = '''
117  {{
118  "IP": {{
119  "secondaries_wait_port": {port},
120  "secondaries_wait_timeout": {timeout},
121  "secondaries": []
122  }}
123  }}
124  '''
125 
126  def add_secondary(self, secondary):
127  with open(self._secondary_config_file, "r+") as config_file:
128  sec_cfg = json.load(config_file)
129  sec_cfg["IP"]["secondaries"].append({"addr": "127.0.0.1:{}".format(secondary.port)})
130  config_file.seek(0)
131  json.dump(sec_cfg, config_file)
132 
133  def update_wait_timeout(self, timeout):
134  with open(self._secondary_config_file, "r+") as config_file:
135  sec_cfg = json.load(config_file)
136  sec_cfg["IP"]["secondaries_wait_timeout"] = timeout
137  config_file.seek(0)
138  json.dump(sec_cfg, config_file)
139 
140  def run(self, run_mode):
141  subprocess.run([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', run_mode],
142  check=True, env=self._run_env)
143 
144  def get_info(self, retry=15):
145  info_exe_res = None
146  for ii in range(0, retry):
147  info_exe_res = subprocess.run([self._aktualizr_info_exe, '-c', self._config_file],
148  timeout=60, stdout=subprocess.PIPE, env=self._run_env)
149  if info_exe_res.returncode == 0 and \
150  str(info_exe_res.stdout).find('no details about installed nor pending images') != -1:
151  break
152 
153  if info_exe_res and info_exe_res.returncode == 0:
154  return str(info_exe_res.stdout)
155  else:
156  logger.error(str(info_exe_res.stderr))
157  return None
158 
159  # ugly stuff that could be removed if Aktualizr had exposed API to check status
160  # or aktializr-info had output status/info in a structured way (e.g. json)
161  def is_ecu_registered(self, ecu_id):
162  device_status = self.get_info()
163  if not ((device_status.find(ecu_id[0]) != -1) and (device_status.find(ecu_id[1]) != -1)):
164  return False
165  not_registered_field = "Removed or not registered ecus:"
166  not_reg_start = device_status.find(not_registered_field)
167  return not_reg_start == -1 or (device_status.find(ecu_id[1], not_reg_start) == -1)
168 
169  def get_current_image_info(self, ecu_id):
170  if self.id == ecu_id:
171  return self.get_current_primary_image_info()
172  else:
173  return self._get_current_image_info(ecu_id)
174 
175  def get_current_pending_image_info(self, ecu_id):
176  return self._get_current_image_info(ecu_id, secondary_image_hash_field='pending image hash: ')
177 
178  # applicable only to secondary ECUs due to inconsistency in presenting information
179  # about primary and secondary ECUs
180  # ugly stuff that could be removed if Aktualizr had exposed API to check status
181  # or aktializr-info had output status/info in a structured way (e.g. json)
182  def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash: '):
183  #secondary_image_filename_field = 'installed image filename: '
184  aktualizr_status = self.get_info()
185  ecu_serial = ecu_id[1]
186  ecu_info_position = aktualizr_status.find(ecu_serial)
187  if ecu_info_position == -1:
188  return None
189 
190  start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
191  end = aktualizr_status.find('\\n', start)
192  hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
193 
194  # start = aktualizr_status.find(secondary_image_filename_field, ecu_info_position)
195  # end = aktualizr_status.find('\\n', start)
196  # filename_val = aktualizr_status[start + len(secondary_image_filename_field):end]
197 
198  return hash_val
199 
200  # ugly stuff that could be removed if Aktualizr had exposed API to check status
201  # or aktializr-info had output status/info in a structured way (e.g. json)
202  def get_current_primary_image_info(self):
203  primary_hash_field = 'Current primary ecu running version: '
204  aktualizr_status = self.get_info()
205  if aktualizr_status:
206  start = aktualizr_status.find(primary_hash_field)
207  end = aktualizr_status.find('\\n', start)
208  return aktualizr_status[start + len(primary_hash_field):end]
209  else:
210  logger.error("Failed to get aktualizr info/status")
211  return ""
212 
213  # ugly stuff that could be removed if Aktualizr had exposed API to check status
214  # or aktializr-info had output status/info in a structured way (e.g. json)
215  def get_primary_pending_version(self):
216  primary_hash_field = 'Pending primary ecu version: '
217  aktualizr_status = self.get_info()
218  start = aktualizr_status.find(primary_hash_field)
219  end = aktualizr_status.find('\\n', start)
220  return aktualizr_status[start + len(primary_hash_field):end]
221 
222  def __enter__(self):
223  self._process = subprocess.Popen([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', self._run_mode],
224  stdout=None if self._output_logs else subprocess.PIPE,
225  stderr=None if self._output_logs else subprocess.STDOUT,
226  close_fds=True,
227  env=self._run_env)
228  logger.debug("Aktualizr has been started")
229  return self
230 
231  def __exit__(self, exc_type, exc_val, exc_tb):
232  self._process.terminate()
233  self._process.wait(timeout=60)
234  logger.debug("Aktualizr has been stopped")
235 
236  def terminate(self, sig=signal.SIGTERM):
237  self._process.send_signal(sig)
238 
239  def output(self):
240  return self._process.stdout.read().decode(errors='replace')
241 
242  def wait_for_completion(self, timeout=120):
243  self._process.wait(timeout)
244 
245  def wait_for_provision(self, timeout=60):
246  deadline = time.time() + timeout
247  while timeout == 0 or time.time() < deadline:
248  info = self.get_info(retry=1)
249  if info is not None and 'Provisioned on server: yes' in info:
250  return
251  time.sleep(0.2)
252  raise TimeoutError
253 
254  def emulate_reboot(self):
255  os.remove(self.reboot_sentinel_file)
256 
257 
258 class KeyStore:
259  base_dir = "./"
260 
261  @staticmethod
262  def copy_keys(dest_path):
263  os.mkdir(dest_path)
264  shutil.copy(KeyStore.ca(), dest_path)
265  shutil.copy(KeyStore.pkey(), dest_path)
266  shutil.copy(KeyStore.cert(), dest_path)
267 
268  @staticmethod
269  def ca():
270  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/ca.pem')
271 
272  @staticmethod
273  def pkey():
274  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/pkey.pem')
275 
276  @staticmethod
277  def cert():
278  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/client.pem')
279 
280 
282 
283  def __init__(self, aktualizr_secondary_exe, id, port=9050, primary_port=9040,
284  sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
285  self.id = id
286  self.port = port
287 
288  self._aktualizr_secondary_exe = aktualizr_secondary_exe
289  self.storage_dir = tempfile.TemporaryDirectory()
290  self.port = self.get_free_port()
291  self.primary_port = self.get_free_port()
292  self._sentinel_file = 'need_reboot'
293  self.reboot_sentinel_file = os.path.join(self.storage_dir.name, self._sentinel_file)
294 
295  with open(path.join(self.storage_dir.name, 'config.toml'), 'w+') as config_file:
296  config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
297  port=self.port, primary_port=self.primary_port,
298  storage_dir=self.storage_dir.name,
299  db_path=path.join(self.storage_dir.name, 'db.sql'),
300  pacman_type='ostree' if treehub and sysroot else 'fake',
301  ostree_sysroot=sysroot.path if sysroot else '',
302  treehub_server=treehub.base_url if treehub else '',
303  sentinel_dir=self.storage_dir.name,
304  sentinel_name=self._sentinel_file
305  ))
306  self._config_file = config_file.name
307 
308  self._run_env = {}
309  if sysroot and ostree_mock_path:
310  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
311  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
312 
313 
314  CONFIG_TEMPLATE = '''
315  [uptane]
316  ecu_serial = "{serial}"
317  ecu_hardware_id = "{hw_ID}"
318 
319  [network]
320  port = {port}
321  primary_ip = "127.0.0.1"
322  primary_port = {primary_port}
323 
324  [storage]
325  type = "sqlite"
326  path = "{storage_dir}"
327  sqldb_path = "{db_path}"
328 
329  [pacman]
330  type = "{pacman_type}"
331  sysroot = "{ostree_sysroot}"
332  ostree_server = "{treehub_server}"
333  os = "dummy-os"
334 
335  [bootloader]
336  reboot_sentinel_dir = "{sentinel_dir}"
337  reboot_sentinel_name = "{sentinel_name}"
338  reboot_command = ""
339  '''
340 
341  def is_running(self):
342  return True if self._process.poll() is None else False
343 
344  @staticmethod
345  def get_free_port():
346  tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
347  tcp.bind(('', 0))
348  port = tcp.getsockname()[1]
349  tcp.close()
350  return port
351 
352  def __enter__(self):
353  self._process = subprocess.Popen([self._aktualizr_secondary_exe, '-c', self._config_file],
354  stdout=None,
355  stderr=None,
356  close_fds=True,
357  env=self._run_env)
358  logger.debug("IP Secondary {} has been started: {}".format(self.id, self.port))
359  return self
360 
361  def __exit__(self, exc_type, exc_val, exc_tb):
362  self._process.terminate()
363  self._process.wait(timeout=60)
364  logger.debug("IP Secondary {} has been stopped".format(self.id))
365 
366  def emulate_reboot(self):
367  os.remove(self.reboot_sentinel_file)
368 
369 
370 class UptaneRepo(HTTPServer):
371  def __init__(self, doc_root, ifc, port, client_handler_map={}):
372  super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.Handler)
373 
374  self.base_url = 'http://{}:{}'.format(self.server_address[0], self.server_address[1])
375  self.doc_root = doc_root
376  self._server_thread = None
377 
378  self.Handler.do_POST = \
379  lambda request: (self.Handler.handler_map.get('POST', {})).get(request.path,
380  self.Handler.default_handler)(request)
381 
382  self.Handler.do_PUT = \
383  lambda request: (self.Handler.handler_map.get('PUT', {})).get(request.path,
384  self.Handler.default_handler)(request)
385 
386  self.Handler.do_GET = \
387  lambda request: (self.Handler.handler_map.get('GET', {})).get(request.path,
388  self.Handler.default_get)(request)
389 
390  for method, method_handlers in client_handler_map.items():
391  for url, handler in method_handlers.items():
392  if self.Handler.handler_map.get(method, None) is None:
393  self.Handler.handler_map[method] = {}
394  self.Handler.handler_map[method][url] = handler
395 
396  class Handler(SimpleHTTPRequestHandler):
397  def __init__(self, request, client_address, server):
398  self.doc_root = server.doc_root
399  self.disable_nagle_algorithm = True
400  super(UptaneRepo.Handler, self).__init__(request, client_address, server)
401 
402  def default_handler(self):
403  self.send_response(200)
404  self.end_headers()
405 
406  def default_get(self):
407  if not os.path.exists(self.file_path):
408  self.send_response(404)
409  self.end_headers()
410  return
411  self.send_response(200)
412  self.end_headers()
413  with open(self.file_path, 'rb') as source:
414  self.copyfile(source, self.wfile)
415 
416  handler_map = {}
417 
418  @property
419  def file_path(self):
420  return os.path.join(self.doc_root, self.path[1:])
421 
422  def start(self):
423  self._server_thread = threading.Thread(target=self.serve_forever)
424  self._server_thread.start()
425  return self
426 
427  def stop(self):
428  self.shutdown()
429  self.server_close()
430  if self._server_thread:
431  self._server_thread.join(timeout=60)
432  self._server_thread = None
433 
434  def __enter__(self):
435  return self.start()
436 
437  def __exit__(self, exc_type, exc_val, exc_tb):
438  self.stop()
439 
440 
442  """
443  This server
444  - serves signed metadata about images
445  - receives device manifest which includes installation report if any installation has happened
446  """
447 
448  director_subdir = "repo/director"
449 
450  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
451  super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.director_subdir), ifc=ifc, port=port,
452  client_handler_map=client_handler_map)
453 
454  self._manifest = None
455  self._last_install_res = False
456  self._last_install_res_lock = threading.RLock()
457  self._installed_condition = threading.Condition()
458 
460  def handle_manifest(self):
461  self.send_response(200)
462  self.end_headers()
463  json_data = None
464  try:
465  data_size = int(self.headers['Content-Length'])
466  data_string = self.rfile.read(data_size)
467  json_data = json.loads(data_string)
468  except Exception as exc:
469  logger.error(exc)
470 
471  if json_data:
472  install_report = json_data['signed'].get('installation_report', "")
473  if install_report:
474  self.server.set_install_event(json_data)
475 
476  handler_map = {'PUT': {'/manifest': handle_manifest}}
477 
478  def set_install_event(self, manifest):
479  with self._installed_condition:
480  self._manifest = manifest
481  self._last_install_res = manifest['signed']['installation_report']['report']['result']['success']
482  self._installed_condition.notifyAll()
483 
484  def wait_for_install(self, timeout=180):
485  with self._installed_condition:
486  self._installed_condition.wait(timeout=timeout)
487  return self._last_install_res
488 
489  def get_install_result(self):
490  with self._installed_condition:
491  return self._last_install_res
492 
493  def get_manifest(self):
494  with self._installed_condition:
495  return self._manifest
496 
497  def get_ecu_manifest(self, ecu_serial):
498  return self.get_manifest()['signed']['ecu_version_manifests'][ecu_serial]
499 
500  def get_ecu_manifest_filepath(self, ecu_serial):
501  return self.get_ecu_manifest(ecu_serial)['signed']['installed_image']['filepath']
502 
504  """
505  This server serves signed metadata about images
506  as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
507  """
508 
509  image_subdir = "repo/repo"
510 
511  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
512  super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.image_subdir), ifc=ifc, port=port,
513  client_handler_map=client_handler_map)
514 
515 
517  """
518  This server serves images
519  """
520  image_subdir = "repo/repo"
521 
522  def __init__(self, root, ifc, port, client_handler_map={}):
523  super(CustomRepo, self).__init__(os.path.join(root, self.image_subdir),
524  ifc=ifc, port=port, client_handler_map=client_handler_map)
525 
526 
528  """
529  This server serves requests from an ostree client, i.e. emulates/mocks the treehub server
530  """
531  def __init__(self, ifc, port, client_handler_map={}):
532  self.root = tempfile.mkdtemp()
533  super(Treehub, self).__init__(self.root, ifc=ifc, port=port, client_handler_map=client_handler_map)
534 
535  def __enter__(self):
536  self.revision = create_repo(self.root)
537  return super(Treehub, self).__enter__()
538 
539  def __exit__(self, exc_type, exc_val, exc_tb):
540  super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
541  shutil.rmtree(self.root, ignore_errors=True)
542 
544  def default_get(self):
545  if not os.path.exists(self.file_path):
546  self.send_response(404)
547  self.end_headers()
548  return
549 
550  super(Treehub.Handler, self).default_get()
551 
552 
554  def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
555  self._bytes_to_send_before_interruption = bytes_to_send_before_interruption
556  self._number_of_failures = number_of_failures
557  self._failure_counter = 0
558  self._url = url
559 
560  def __call__(self, request_handler):
561  if self._failure_counter < self._number_of_failures:
562  request_handler.send_response(200)
563  file_size = os.path.getsize(request_handler.file_path)
564  request_handler.send_header('Content-Length', file_size)
565  request_handler.end_headers()
566 
567  with open(request_handler.file_path, 'rb') as source:
568  data = source.read(self._bytes_to_send_before_interruption)
569  request_handler.wfile.write(data)
570 
571  self._failure_counter += 1
572  else:
573  request_handler.default_get()
574 
575  def map(self, url=''):
576  return {'GET': {url if url else self._url: DownloadInterruptionHandler(self._number_of_failures,
578 
579 
581  dummy_filez = (b'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
582  b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
583  b'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
584 
585  def __init__(self, number_of_failures=1, url='', fake_filez=False):
586  self._number_of_failures = number_of_failures
587  self._failure_counter = 0
588  self._url = url
589  self._fake_filez = fake_filez
590 
591  def __call__(self, request_handler):
592  if self._number_of_failures == -1 or self._failure_counter < self._number_of_failures:
593  request_handler.send_response(200)
594  request_handler.end_headers()
595  if self._fake_filez:
596  request_handler.wfile.write(self.dummy_filez)
597  else:
598  request_handler.wfile.write(b'malformed image')
599 
600  self._failure_counter += 1
601  else:
602  request_handler.default_get()
603 
604  def map(self, url):
605  return {'GET': {url if url else self._url: MalformedImageHandler(self._number_of_failures,
606  fake_filez=self._fake_filez)}}
607 
608 
610  def __init__(self, number_of_failures=1, url=''):
611  self._number_of_failures = number_of_failures
612  self._failure_counter = 0
613  self._url = url
614 
615  def __call__(self, request_handler):
616  if self._failure_counter < self._number_of_failures:
617  request_handler.send_response(200)
618  file_size = os.path.getsize(request_handler.file_path)
619  request_handler.end_headers()
620 
621  with open(request_handler.file_path, 'rb') as source:
622  while True:
623  data = source.read(1)
624  if not data:
625  break
626  request_handler.wfile.write(data)
627  request_handler.wfile.flush()
628  import time
629  time.sleep(100)
630 
631  self._failure_counter += 1
632  else:
633  request_handler.default_get()
634 
635  def map(self, url):
636  return {'GET': {url if url else self._url: SlowRetrievalHandler(self._number_of_failures)}}
637 
638 
640  def __init__(self, number_of_redirects=1, url=''):
641  self._number_of_redirects = number_of_redirects
642  self._redirect_counter = 0
643  self._url = url
644 
645  def __call__(self, request_handler):
647  request_handler.send_response(301)
648  request_handler.send_header('Location', request_handler.server.base_url + request_handler.path)
649  request_handler.end_headers()
650  self._redirect_counter += 1
651  else:
652  request_handler.default_get()
653 
654  def map(self, url):
655  return {'GET': {url if url else self._url: RedirectHandler(self._number_of_redirects)}}
656 
657 
659  def __init__(self, number_of_failures=1):
660  self._number_of_failures = number_of_failures
661  self._failure_counter = 0
662 
663  def __call__(self, request_handler):
664  if self._failure_counter < self._number_of_failures:
665  request_handler.send_response(200)
666  request_handler.end_headers()
667  request_handler.wfile.write(b'{"non-uptane-json": "some-value"}')
668 
669  self._failure_counter += 1
670  else:
671  request_handler.default_get()
672 
673  def map(self, url):
674  return {'GET': {url: MalformedJsonHandler(self._number_of_failures)}}
675 
676 
678  def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
679  self.image_rel_dir = 'repo/repo'
680  self.target_rel_dir = 'repo/repo/targets'
681 
682  self._repo_manager_exe = repo_manager_exe
683  self.root_dir = tempfile.mkdtemp()
684 
685  self.server_port = server_port
686  self.director_port = director_port
687  self.image_repo_port = image_repo_port
688  self.custom_repo_port = custom_repo_port
689 
690  def create_generic_server(self, **kwargs):
691  return FakeTestServerBackground(meta_path=self.root_dir, target_path=self.target_dir, port=self.server_port)
692 
693  def create_director_repo(self, handler_map={}):
694  return DirectorRepo(self.root_dir, 'localhost', self.director_port, client_handler_map=handler_map)
695 
696  def create_image_repo(self, handler_map={}):
697  return ImageRepo(self.root_dir, 'localhost', self.image_repo_port, client_handler_map=handler_map)
698 
699  def create_custom_repo(self, handler_map={}):
700  return CustomRepo(self.root_dir, 'localhost', self.custom_repo_port, client_handler_map=handler_map)
701 
702  @property
703  def image_dir(self):
704  return path.join(self.root_dir, self.image_rel_dir)
705 
706  @property
707  def target_dir(self):
708  return path.join(self.root_dir, self.target_rel_dir)
709 
710  @property
711  def target_file(self):
712  return path.join(self.image_dir, 'targets.json')
713 
714  def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
715 
716  targetname = target_name if target_name else image_filename
717 
718  with open(path.join(self.image_dir, image_filename), 'wb') as image_file:
719  image_file.write(urandom(image_size))
720 
721  image_creation_cmdline = [self._repo_manager_exe, '--path', self.root_dir,
722  '--command', 'image', '--filename', image_filename, '--targetname', targetname, '--hwid', id[0]]
723 
724  if custom_url:
725  image_creation_cmdline.append('--url')
726  image_creation_cmdline.append(custom_url)
727 
728  subprocess.run(image_creation_cmdline, cwd=self.image_dir, check=True)
729 
730  # update the director metadata
731  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
732  '--command', 'addtarget', '--targetname', targetname,
733  '--hwid', id[0], '--serial', id[1]], check=True)
734 
735  # sign so the image becomes available for an update for a client/device
736  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
737 
738  with open(self.target_file, "r") as target_file:
739  targets = json.load(target_file)
740  target_hash = targets["signed"]["targets"][targetname]["hashes"]["sha256"]
741 
742  return target_hash
743 
744  def add_ostree_target(self, id, rev_hash, target_name=None):
745  # emulate the backend behavior on defining a target name for OSTREE target format
746  target_name = rev_hash if target_name is None else "{}-{}".format(target_name, rev_hash)
747  image_creation_cmdline = [self._repo_manager_exe,
748  '--command', 'image',
749  '--path', self.root_dir,
750  '--targetname', target_name,
751  '--targetsha256', rev_hash,
752  '--targetlength', '0',
753  '--targetformat', 'OSTREE',
754  '--hwid', id[0]]
755  subprocess.run(image_creation_cmdline, check=True)
756 
757  subprocess.run([self._repo_manager_exe,
758  '--command', 'addtarget',
759  '--path', self.root_dir,
760  '--targetname', target_name,
761  '--hwid', id[0],
762  '--serial', id[1]],
763  check=True)
764 
765  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
766 
767  return target_name
768 
769  def __enter__(self):
770  self._generate_repo()
771  return self
772 
773  def __exit__(self, exc_type, exc_val, exc_tb):
774  shutil.rmtree(self.root_dir, ignore_errors=True)
775 
776  def _generate_repo(self):
777  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
778  '--command', 'generate', '--keytype', 'ED25519'], check=True)
779 
780 
781 def with_aktualizr(start=True, output_logs=False, id=('primary-hw-ID-001', str(uuid4())), wait_timeout=60,
782  log_level=1, aktualizr_primary_exe='src/aktualizr_primary/aktualizr',
783  aktualizr_info_exe='src/aktualizr_info/aktualizr-info',
784  run_mode='once'):
785  def decorator(test):
786  @wraps(test)
787  def wrapper(*args, ostree_mock_path=None, **kwargs):
788  aktualizr = Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
789  aktualizr_info_exe=aktualizr_info_exe, id=id,
790  wait_timeout=wait_timeout, log_level=log_level, output_logs=output_logs,
791  run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
792  if start:
793  with aktualizr:
794  result = test(*args, **kwargs, aktualizr=aktualizr)
795  else:
796  result = test(*args, **kwargs, aktualizr=aktualizr)
797  return result
798  return wrapper
799  return decorator
800 
801 
802 # The following decorators can be eliminated if pytest framework (or similar) is used
803 # by using fixtures instead
804 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
805  def decorator(test):
806  @wraps(test)
807  def wrapper(*args, **kwargs):
808  repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
809  with UptaneTestRepo(repo_manager_exe_abs_path) as repo:
810  if start_generic_server:
811  with repo.create_generic_server() as uptane_server:
812  result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
813  else:
814  result = test(*args, **kwargs, uptane_repo=repo)
815  return result
816  return wrapper
817  return decorator
818 
819 
820 def with_director(start=True, handlers=[]):
821  def decorator(test):
822  @wraps(test)
823  def wrapper(*args, uptane_repo, **kwargs):
824  def func(handler_map={}):
825  director = uptane_repo.create_director_repo(handler_map=handler_map)
826  if start:
827  with director:
828  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
829  else:
830  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
831  return result
832 
833  if handlers and len(handlers) > 0:
834  for handler in handlers:
835  result = func(handler.map(kwargs.get('test_path', '')))
836  if not result:
837  break
838  else:
839  result = func()
840  return result
841  return wrapper
842  return decorator
843 
844 
845 def with_imagerepo(start=True, handlers=[]):
846  def decorator(test):
847  @wraps(test)
848  def wrapper(*args, uptane_repo, **kwargs):
849  def func(handler_map={}):
850  image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
851  if start:
852  with image_repo:
853  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
854  else:
855  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
856  return result
857 
858  if handlers and len(handlers) > 0:
859  for handler in handlers:
860  result = func(handler.map(kwargs.get('test_path', '')))
861  if not result:
862  break
863  else:
864  result = func()
865  return result
866  return wrapper
867  return decorator
868 
869 
870 def with_secondary(start=True, id=('secondary-hw-ID-001', str(uuid4())),
871  aktualizr_secondary_exe='src/aktualizr_secondary/aktualizr-secondary'):
872  def decorator(test):
873  @wraps(test)
874  def wrapper(*args, **kwargs):
875  secondary = IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, id=id, **kwargs)
876  if start:
877  with secondary:
878  result = test(*args, **kwargs, secondary=secondary)
879  else:
880  result = test(*args, **kwargs, secondary=secondary)
881  return result
882  return wrapper
883  return decorator
884 
885 
886 def with_path(paths):
887  def decorator(test):
888  @wraps(test)
889  def wrapper(*args, **kwargs):
890  for test_path in paths:
891  result = test(*args, **kwargs, test_path=test_path)
892  if not result:
893  break
894  return result
895  return wrapper
896  return decorator
897 
898 
900  def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
901  self.aktualizr = aktualizr
902  self.images_to_install = []
903  for image in images_to_install:
904  self.images_to_install.append({
905  'ecu_id': image[0],
906  'filename': image[1],
907  'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2] if len(image) > 2 else '')
908  })
909 
910  def are_images_installed(self):
911  result = True
912  for image in self.images_to_install:
913  if not (image['hash'] == self.aktualizr.get_current_image_info(image['ecu_id'])):
914  result = False
915  break
916 
917  return result
918 
919 
920 def with_install_manager(default_images=True):
921  def decorator(test):
922  @wraps(test)
923  def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
924  if default_images and (not images_to_install or len(images_to_install) == 0):
925  images_to_install = [(aktualizr.id, 'primary-image.img')]
926  if secondary:
927  images_to_install.append((secondary.id, 'secondary-image.img'))
928  install_mngr = InstallManager(aktualizr, uptane_repo, images_to_install)
929  result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
930  uptane_repo=uptane_repo, install_mngr=install_mngr)
931  return result
932  return wrapper
933  return decorator
934 
935 
936 def with_images(images_to_install):
937  def decorator(test):
938  @wraps(test)
939  def wrapper(*args, **kwargs):
940  return test(*args, **kwargs, images_to_install=images_to_install)
941  return wrapper
942  return decorator
943 
944 
945 def with_customrepo(start=True, handlers=[]):
946  def decorator(test):
947  @wraps(test)
948  def wrapper(*args, uptane_repo, **kwargs):
949  def func(handler_map={}):
950  custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
951  if start:
952  with custom_repo:
953  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
954  else:
955  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
956  return result
957 
958  if handlers and len(handlers) > 0:
959  for handler in handlers:
960  result = func(handler.map(kwargs.get('test_path', '')))
961  if not result:
962  break
963  else:
964  result = func()
965  return result
966  return wrapper
967  return decorator
968 
969 
970 class Sysroot:
971  repo_path = 'ostree_repo'
972 
973  def __enter__(self):
974  self._root = tempfile.mkdtemp()
975  self.path = os.path.join(self._root, self.repo_path)
976  self.version_file = os.path.join(self.path, 'version')
977 
978  subprocess.run(['cp', '-r', self.repo_path, self._root], check=True)
979 
980  initial_revision = self.get_revision()
981  with open(self.version_file, 'wt') as version_file:
982  version_file.writelines(['{}\n'.format(initial_revision),
983  '{}\n'.format('0'),
984  '{}\n'.format(initial_revision),
985  '{}\n'.format('0')])
986 
987  return self
988 
989  def __exit__(self, exc_type, exc_val, exc_tb):
990  shutil.rmtree(self._root, ignore_errors=True)
991 
992  def get_revision(self):
993  rev_cmd_res = subprocess.run(['ostree', 'rev-parse', '--repo', self.path + '/ostree/repo', 'generate-remote:generated'],
994  timeout=60, check=True, stdout=subprocess.PIPE)
995 
996  return rev_cmd_res.stdout.decode('ascii').rstrip('\n')
997 
998  def update_revision(self, rev):
999  with open(self.version_file, 'wt') as version_file:
1000  version_file.writelines(['{}\n'.format(rev),
1001  '{}\n'.format('1'),
1002  '{}\n'.format(rev),
1003  '{}\n'.format('1')])
1004 
1005 
1006 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
1007  def decorator(test):
1008  @wraps(test)
1009  def wrapper(*args, **kwargs):
1010  with Sysroot() as sysroot:
1011  return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1012  return wrapper
1013  return decorator
1014 
1015 
1016 def with_treehub(handlers=[], port=0):
1017  def decorator(test):
1018  @wraps(test)
1019  def wrapper(*args, **kwargs):
1020  def func(handler_map={}):
1021  with Treehub('localhost', port=port, client_handler_map=handler_map) as treehub:
1022  return test(*args, **kwargs, treehub=treehub)
1023 
1024  if handlers and len(handlers) > 0:
1025  for handler in handlers:
1026  result = func(handler.map(kwargs.get('test_path', '')))
1027  if not result:
1028  break
1029  else:
1030  result = func()
1031  return result
1032  return wrapper
1033  return decorator
1034 
test_fixtures.UptaneTestRepo.target_file
def target_file(self)
Definition: test_fixtures.py:711
test_fixtures.IPSecondary._process
_process
Definition: test_fixtures.py:353
test_fixtures.SlowRetrievalHandler._url
_url
Definition: test_fixtures.py:613
test_fixtures.CustomRepo
Definition: test_fixtures.py:516
test_fixtures.MalformedImageHandler.dummy_filez
tuple dummy_filez
Definition: test_fixtures.py:581
test_fixtures.DirectorRepo._last_install_res_lock
_last_install_res_lock
Definition: test_fixtures.py:456
test_fixtures.CustomRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:520
test_fixtures.IPSecondary.id
id
Definition: test_fixtures.py:284
test_fixtures.UptaneRepo.base_url
base_url
Definition: test_fixtures.py:374
test_fixtures.MalformedImageHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:586
test_fixtures.Aktualizr._storage_dir
_storage_dir
Definition: test_fixtures.py:32
test_fixtures.MalformedJsonHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:660
test_fixtures.Treehub.root
root
Definition: test_fixtures.py:532
test_fixtures.UptaneRepo.doc_root
doc_root
Definition: test_fixtures.py:375
test_fixtures.Sysroot.path
path
Definition: test_fixtures.py:975
test_fixtures.Aktualizr._output_logs
_output_logs
Definition: test_fixtures.py:64
test_fixtures.MalformedImageHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:587
test_fixtures.DirectorRepo
Definition: test_fixtures.py:441
test_fixtures.IPSecondary.get_free_port
def get_free_port()
Definition: test_fixtures.py:345
test_fixtures.UptaneRepo._server_thread
_server_thread
Definition: test_fixtures.py:376
test_fixtures.Sysroot._root
_root
Definition: test_fixtures.py:974
test_fixtures.InstallManager.images_to_install
images_to_install
Definition: test_fixtures.py:902
test_fixtures.UptaneTestRepo.root_dir
root_dir
Definition: test_fixtures.py:683
test_fixtures.UptaneRepo.Handler.disable_nagle_algorithm
disable_nagle_algorithm
Definition: test_fixtures.py:399
test_fixtures.Aktualizr.get_current_primary_image_info
def get_current_primary_image_info(self)
Definition: test_fixtures.py:202
test_fixtures.Aktualizr._log_level
_log_level
Definition: test_fixtures.py:33
test_fixtures.Sysroot.repo_path
string repo_path
Definition: test_fixtures.py:971
test_fixtures.UptaneRepo.Handler
Definition: test_fixtures.py:396
test_fixtures.IPSecondary._config_file
_config_file
Definition: test_fixtures.py:305
test_fixtures.UptaneRepo.Handler.file_path
def file_path(self)
Definition: test_fixtures.py:419
test_fixtures.RedirectHandler._url
_url
Definition: test_fixtures.py:643
test_fixtures.UptaneTestRepo
Definition: test_fixtures.py:677
test_fixtures.UptaneTestRepo.image_repo_port
image_repo_port
Definition: test_fixtures.py:687
test_fixtures.RedirectHandler._number_of_redirects
_number_of_redirects
Definition: test_fixtures.py:641
test_fixtures.UptaneTestRepo._repo_manager_exe
_repo_manager_exe
Definition: test_fixtures.py:682
test_fixtures.ImageRepo
Definition: test_fixtures.py:503
test_fixtures.IPSecondary.port
port
Definition: test_fixtures.py:285
test_fixtures.Aktualizr._sentinel_file
_sentinel_file
Definition: test_fixtures.py:34
test_fixtures.ImageRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:509
test_fixtures.IPSecondary.primary_port
primary_port
Definition: test_fixtures.py:290
test_fixtures.InstallManager
Definition: test_fixtures.py:899
test_fixtures.Aktualizr._run_env
_run_env
Definition: test_fixtures.py:66
test_fixtures.IPSecondary
Definition: test_fixtures.py:281
test_fixtures.Treehub
Definition: test_fixtures.py:527
test_fixtures.DirectorRepo._manifest
_manifest
Definition: test_fixtures.py:454
test_fixtures.UptaneTestRepo.target_dir
def target_dir(self)
Definition: test_fixtures.py:707
test_fixtures.Aktualizr.id
id
Definition: test_fixtures.py:29
test_fixtures.DownloadInterruptionHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:557
test_fixtures.Aktualizr._process
_process
Definition: test_fixtures.py:223
test_fixtures.UptaneTestRepo.server_port
server_port
Definition: test_fixtures.py:685
test_fixtures.DirectorRepo._last_install_res
_last_install_res
Definition: test_fixtures.py:455
test_fixtures.UptaneTestRepo.target_rel_dir
target_rel_dir
Definition: test_fixtures.py:680
test_fixtures.IPSecondary._sentinel_file
_sentinel_file
Definition: test_fixtures.py:291
test_fixtures.KeyStore
Definition: test_fixtures.py:258
test_fixtures.UptaneTestRepo.image_rel_dir
image_rel_dir
Definition: test_fixtures.py:679
test_fixtures.Aktualizr.get_info
def get_info(self, retry=15)
Definition: test_fixtures.py:144
test_fixtures.Aktualizr._import_dir
_import_dir
Definition: test_fixtures.py:36
test_fixtures.UptaneRepo.Handler.doc_root
doc_root
Definition: test_fixtures.py:398
test_fixtures.DownloadInterruptionHandler
Definition: test_fixtures.py:553
test_fixtures.RedirectHandler
Definition: test_fixtures.py:639
test_fixtures.UptaneRepo
Definition: test_fixtures.py:370
test_fixtures.UptaneTestRepo._generate_repo
def _generate_repo(self)
Definition: test_fixtures.py:776
test_fixtures.UptaneTestRepo.custom_repo_port
custom_repo_port
Definition: test_fixtures.py:688
test_fixtures.Aktualizr._aktualizr_primary_exe
_aktualizr_primary_exe
Definition: test_fixtures.py:30
test_fixtures.SlowRetrievalHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:611
test_fixtures.SlowRetrievalHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:612
test_fixtures.IPSecondary.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:292
test_fixtures.DirectorRepo._installed_condition
_installed_condition
Definition: test_fixtures.py:457
test_fixtures.Treehub.Handler
Definition: test_fixtures.py:543
test_fixtures.UptaneTestRepo.director_port
director_port
Definition: test_fixtures.py:686
test_fixtures.MalformedJsonHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:661
test_fixtures.InstallManager.aktualizr
aktualizr
Definition: test_fixtures.py:901
test_fixtures.MalformedImageHandler._fake_filez
_fake_filez
Definition: test_fixtures.py:589
test_fixtures.DownloadInterruptionHandler._bytes_to_send_before_interruption
_bytes_to_send_before_interruption
Definition: test_fixtures.py:555
test_fixtures.MalformedJsonHandler
Definition: test_fixtures.py:658
test_fixtures.MalformedImageHandler
Definition: test_fixtures.py:580
test_fixtures.DirectorRepo.Handler
Definition: test_fixtures.py:459
test_fixtures.Aktualizr
Definition: test_fixtures.py:26
test_fixtures.IPSecondary._aktualizr_secondary_exe
_aktualizr_secondary_exe
Definition: test_fixtures.py:287
test_fixtures.SlowRetrievalHandler
Definition: test_fixtures.py:609
test_fixtures.MalformedImageHandler._url
_url
Definition: test_fixtures.py:588
test_fixtures.Aktualizr._run_mode
_run_mode
Definition: test_fixtures.py:65
test_fixtures.DirectorRepo.director_subdir
string director_subdir
Definition: test_fixtures.py:448
test_fixtures.DownloadInterruptionHandler._url
_url
Definition: test_fixtures.py:558
test_fixtures.DownloadInterruptionHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:556
test_fixtures.IPSecondary.storage_dir
storage_dir
Definition: test_fixtures.py:288
test_fixtures.Aktualizr._secondary_config_file
_secondary_config_file
Definition: test_fixtures.py:44
test_fixtures.Aktualizr._aktualizr_info_exe
_aktualizr_info_exe
Definition: test_fixtures.py:31
test_fixtures.Aktualizr.add_secondary
def add_secondary(self, secondary)
Definition: test_fixtures.py:126
test_fixtures.Treehub.revision
revision
Definition: test_fixtures.py:536
test_fixtures.Aktualizr._get_current_image_info
def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:')
Definition: test_fixtures.py:182
test_fixtures.RedirectHandler._redirect_counter
_redirect_counter
Definition: test_fixtures.py:642
test_fixtures.Aktualizr._config_file
_config_file
Definition: test_fixtures.py:61
test_fixtures.Sysroot.get_revision
def get_revision(self)
Definition: test_fixtures.py:992
test_fixtures.IPSecondary._run_env
_run_env
Definition: test_fixtures.py:307
test_fixtures.Aktualizr.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:35
test_fixtures.Sysroot.version_file
version_file
Definition: test_fixtures.py:976
test_fixtures.UptaneTestRepo.image_dir
def image_dir(self)
Definition: test_fixtures.py:703
test_fixtures.Sysroot
Definition: test_fixtures.py:970