Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
test_fixtures.py
1 import subprocess
2 import logging
3 import json
4 import tempfile
5 import threading
6 import os
7 import shutil
8 import signal
9 import socket
10 import time
11 
12 from os import path
13 from uuid import uuid4
14 from os import urandom
15 from functools import wraps
16 from multiprocessing import pool, cpu_count
17 from http.server import SimpleHTTPRequestHandler, HTTPServer
18 
19 from fake_http_server.fake_test_server import FakeTestServerBackground
20 from sota_tools.treehub_server import create_repo
21 
22 
23 logger = logging.getLogger(__name__)
24 
25 
26 class Aktualizr:
27 
28  def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29  uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30  primary_port=None, secondaries=None, secondary_wait_sec=600, output_logs=True,
31  run_mode='once', director=None, image_repo=None,
32  sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
33  self.id = id
34  self._aktualizr_primary_exe = aktualizr_primary_exe
35  self._aktualizr_info_exe = aktualizr_info_exe
36  self._storage_dir = tempfile.TemporaryDirectory()
37  self._log_level = log_level
38  self._sentinel_file = 'need_reboot'
39  self.reboot_sentinel_file = os.path.join(self._storage_dir.name, self._sentinel_file)
40  self._import_dir = os.path.join(self._storage_dir.name, 'import')
41  KeyStore.copy_keys(self._import_dir)
42 
43  with open(path.join(self._storage_dir.name, 'secondary_config.json'), 'w+') as secondary_config_file:
44  secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45  format(port=primary_port if primary_port else wait_port,
46  timeout=wait_timeout))
47  json.dump(secondary_cfg, secondary_config_file)
48  self._secondary_config_file = secondary_config_file.name
49  self._secondary_wait_sec = secondary_wait_sec
50 
51  with open(path.join(self._storage_dir.name, 'config.toml'), 'w+') as config_file:
52  config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
53  import_path=self._import_dir,
54  serial=id[1], hw_ID=id[0],
55  storage_dir=self._storage_dir.name,
56  db_path=path.join(self._storage_dir.name, 'sql.db'),
57  log_level=self._log_level,
58  secondary_cfg_file=self._secondary_config_file,
59  secondary_wait_sec=self._secondary_wait_sec,
60  director=director.base_url if director else '',
61  image_repo=image_repo.base_url if image_repo else '',
62  pacman_type='ostree' if treehub and sysroot else 'none',
63  ostree_sysroot=sysroot.path if sysroot else '',
64  treehub_server=treehub.base_url if treehub else '',
65  sentinel_dir=self._storage_dir.name,
66  sentinel_name=self._sentinel_file))
67  self._config_file = config_file.name
68 
69  if secondaries is not None:
70  for s in secondaries:
71  self.add_secondary(s)
72  self._output_logs = output_logs
73  self._run_mode = run_mode
74  self._run_env = {}
75  if sysroot and ostree_mock_path:
76  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
77  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
78 
79  CONFIG_TEMPLATE = '''
80  [tls]
81  server = "{server_url}"
82 
83  [import]
84  base_path = "{import_path}"
85  tls_cacert_path = "ca.pem"
86  tls_pkey_path = "pkey.pem"
87  tls_clientcert_path = "client.pem"
88 
89  [provision]
90  primary_ecu_serial = "{serial}"
91  primary_ecu_hardware_id = "{hw_ID}"
92 
93  [storage]
94  path = "{storage_dir}"
95  type = "sqlite"
96  sqldb_path = "{db_path}"
97 
98  [pacman]
99  type = "{pacman_type}"
100  sysroot = "{ostree_sysroot}"
101  ostree_server = "{treehub_server}"
102  os = "dummy-os"
103 
104  [uptane]
105  polling_sec = 0
106  secondary_config_file = "{secondary_cfg_file}"
107  secondary_preinstall_wait_sec = {secondary_wait_sec}
108  director_server = "{director}"
109  repo_server = "{image_repo}"
110 
111  [bootloader]
112  reboot_sentinel_dir = "{sentinel_dir}"
113  reboot_sentinel_name = "{sentinel_name}"
114  reboot_command = ""
115 
116  [logger]
117  loglevel = {log_level}
118 
119  '''
120 
121  SECONDARY_CONFIG_TEMPLATE = '''
122  {{
123  "IP": {{
124  "secondaries_wait_port": {port},
125  "secondaries_wait_timeout": {timeout},
126  "secondaries": []
127  }}
128  }}
129  '''
130 
131  def set_mode(self, mode):
132  self._run_mode = mode
133 
134  def add_secondary(self, secondary):
135  with open(self._secondary_config_file, "r+") as config_file:
136  sec_cfg = json.load(config_file)
137  sec_cfg["IP"]["secondaries"].append({"addr": "127.0.0.1:{}".format(secondary.port)})
138  config_file.seek(0)
139  json.dump(sec_cfg, config_file)
140 
141  def update_wait_timeout(self, timeout):
142  with open(self._secondary_config_file, "r+") as config_file:
143  sec_cfg = json.load(config_file)
144  sec_cfg["IP"]["secondaries_wait_timeout"] = timeout
145  config_file.seek(0)
146  json.dump(sec_cfg, config_file)
147 
148  def run(self, run_mode):
149  subprocess.run([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', run_mode],
150  check=True, env=self._run_env)
151 
152  # another ugly stuff that could be replaced with something more reliable if Aktualizr had exposed API
153  # to check status or aktializr-info had output status/info in a structured way (e.g. json)
154  def get_info(self, retry=30):
155  info_exe_res = None
156  for ii in range(0, retry):
157  info_exe_res = subprocess.run([self._aktualizr_info_exe, '-c', self._config_file],
158  timeout=60, stdout=subprocess.PIPE, env=self._run_env)
159  if info_exe_res.returncode == 0 and \
160  str(info_exe_res.stdout).find('Provisioned on server: yes') != -1 and \
161  str(info_exe_res.stdout).find('Current Primary ECU running version:') != -1:
162  break
163 
164  if info_exe_res and info_exe_res.returncode == 0:
165  return str(info_exe_res.stdout)
166  else:
167  logger.error('Failed to get an aktualizr\'s status info, stdout: {}, stderr: {}'.
168  format(str(info_exe_res.stdout), str(info_exe_res.stderr)))
169  return None
170 
171  # ugly stuff that could be removed if Aktualizr had exposed API to check status
172  # or aktualizr-info had output status/info in a structured way (e.g. json)
173  def is_ecu_registered(self, ecu_id):
174  device_status = self.get_info()
175  if not ((device_status.find(ecu_id[0]) != -1) and (device_status.find(ecu_id[1]) != -1)):
176  return False
177  not_registered_field = "Removed or not registered ECUs:"
178  not_reg_start = device_status.find(not_registered_field)
179  return not_reg_start == -1 or (device_status.find(ecu_id[1], not_reg_start) == -1)
180 
181  def get_current_image_info(self, ecu_id):
182  if self.id == ecu_id:
183  return self.get_current_primary_image_info()
184  else:
185  return self._get_current_image_info(ecu_id)
186 
187  def get_current_pending_image_info(self, ecu_id):
188  return self._get_current_image_info(ecu_id, secondary_image_hash_field='pending image hash: ')
189 
190  # applicable only to Secondary ECUs due to inconsistency in presenting information
191  # about Primary and Secondary ECUs
192  # ugly stuff that could be removed if aktualizr had exposed API to check status
193  # or aktializr-info had output status/info in a structured way (e.g. json)
194  def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash: '):
195  #secondary_image_filename_field = 'installed image filename: '
196  aktualizr_status = self.get_info()
197  ecu_serial = ecu_id[1]
198  ecu_info_position = aktualizr_status.find(ecu_serial)
199  if ecu_info_position == -1:
200  return None
201 
202  start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
203  end = aktualizr_status.find('\\n', start)
204  hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
205 
206  # start = aktualizr_status.find(secondary_image_filename_field, ecu_info_position)
207  # end = aktualizr_status.find('\\n', start)
208  # filename_val = aktualizr_status[start + len(secondary_image_filename_field):end]
209 
210  return hash_val
211 
212  # ugly stuff that could be removed if Aktualizr had exposed API to check status
213  # or aktializr-info had output status/info in a structured way (e.g. json)
214  def get_current_primary_image_info(self):
215  primary_hash_field = 'Current Primary ECU running version: '
216  aktualizr_status = self.get_info()
217  if aktualizr_status:
218  start = aktualizr_status.find(primary_hash_field)
219  end = aktualizr_status.find('\\n', start)
220  return aktualizr_status[start + len(primary_hash_field):end]
221  else:
222  logger.error("Failed to get aktualizr info/status")
223  return ""
224 
225  # ugly stuff that could be removed if Aktualizr had exposed API to check status
226  # or aktializr-info had output status/info in a structured way (e.g. json)
227  def get_primary_pending_version(self):
228  primary_hash_field = 'Pending Primary ECU version: '
229  aktualizr_status = self.get_info()
230  start = aktualizr_status.find(primary_hash_field)
231  end = aktualizr_status.find('\\n', start)
232  return aktualizr_status[start + len(primary_hash_field):end]
233 
234  def __enter__(self):
235  self._process = subprocess.Popen([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', self._run_mode],
236  stdout=None if self._output_logs else subprocess.PIPE,
237  stderr=None if self._output_logs else subprocess.STDOUT,
238  close_fds=True,
239  env=self._run_env)
240  logger.debug("Aktualizr has been started")
241  return self
242 
243  def __exit__(self, exc_type, exc_val, exc_tb):
244  self._process.terminate()
245  self._process.wait(timeout=60)
246  logger.debug("Aktualizr has been stopped")
247 
248  def terminate(self, sig=signal.SIGTERM):
249  self._process.send_signal(sig)
250 
251  def output(self):
252  return self._process.stdout.read().decode(errors='replace')
253 
254  def wait_for_completion(self, timeout=120):
255  self._process.wait(timeout)
256 
257  def wait_for_provision(self, timeout=60):
258  deadline = time.time() + timeout
259  while timeout == 0 or time.time() < deadline:
260  info = self.get_info(retry=1)
261  if info is not None and 'Provisioned on server: yes' in info:
262  return
263  time.sleep(0.2)
264  raise TimeoutError
265 
266  def emulate_reboot(self):
267  os.remove(self.reboot_sentinel_file)
268 
269 
270 class KeyStore:
271  base_dir = "./"
272 
273  @staticmethod
274  def copy_keys(dest_path):
275  os.mkdir(dest_path)
276  shutil.copy(KeyStore.ca(), dest_path)
277  shutil.copy(KeyStore.pkey(), dest_path)
278  shutil.copy(KeyStore.cert(), dest_path)
279 
280  @staticmethod
281  def ca():
282  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/ca.pem')
283 
284  @staticmethod
285  def pkey():
286  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/pkey.pem')
287 
288  @staticmethod
289  def cert():
290  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/client.pem')
291 
292 
294 
295  def __init__(self, aktualizr_secondary_exe, id, port=None, primary_port=None,
296  sysroot=None, treehub=None, output_logs=True, force_reboot=False,
297  ostree_mock_path=None, **kwargs):
298  self.id = id
299 
300  self._aktualizr_secondary_exe = aktualizr_secondary_exe
301  self.storage_dir = tempfile.TemporaryDirectory()
302  self.port = self.get_free_port() if port is None else port
303  self.primary_port = self.get_free_port() if primary_port is None else primary_port
304  self._sentinel_file = 'need_reboot'
305  self._output_logs = output_logs
306  self.reboot_sentinel_file = os.path.join(self.storage_dir.name, self._sentinel_file)
307 
308  if force_reboot:
309  reboot_command = "rm {}".format(self.reboot_sentinel_file)
310  else:
311  reboot_command = ""
312 
313  with open(path.join(self.storage_dir.name, 'config.toml'), 'w+') as config_file:
314  config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
315  force_reboot=1 if force_reboot else 0,
316  reboot_command=reboot_command,
317  port=self.port, primary_port=self.primary_port,
318  storage_dir=self.storage_dir.name,
319  db_path=path.join(self.storage_dir.name, 'db.sql'),
320  pacman_type='ostree' if treehub and sysroot else 'none',
321  ostree_sysroot=sysroot.path if sysroot else '',
322  treehub_server=treehub.base_url if treehub else '',
323  sentinel_dir=self.storage_dir.name,
324  sentinel_name=self._sentinel_file
325  ))
326  self._config_file = config_file.name
327 
328  self._run_env = {}
329  if sysroot and ostree_mock_path:
330  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
331  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
332 
333 
334  CONFIG_TEMPLATE = '''
335  [uptane]
336  ecu_serial = "{serial}"
337  ecu_hardware_id = "{hw_ID}"
338  force_install_completion = {force_reboot}
339 
340  [network]
341  port = {port}
342  primary_ip = "127.0.0.1"
343  primary_port = {primary_port}
344 
345  [storage]
346  type = "sqlite"
347  path = "{storage_dir}"
348  sqldb_path = "{db_path}"
349 
350  [pacman]
351  type = "{pacman_type}"
352  sysroot = "{ostree_sysroot}"
353  ostree_server = "{treehub_server}"
354  os = "dummy-os"
355 
356  [bootloader]
357  reboot_sentinel_dir = "{sentinel_dir}"
358  reboot_sentinel_name = "{sentinel_name}"
359  reboot_command = "{reboot_command}"
360  '''
361 
362  def is_running(self):
363  return True if self._process.poll() is None else False
364 
365  @staticmethod
366  def get_free_port():
367  tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
368  tcp.bind(('', 0))
369  port = tcp.getsockname()[1]
370  tcp.close()
371  return port
372 
373  def __enter__(self):
374  self._process = subprocess.Popen([self._aktualizr_secondary_exe, '-c', self._config_file],
375  stdout=None if self._output_logs else subprocess.PIPE,
376  stderr=None if self._output_logs else subprocess.STDOUT,
377  close_fds=True,
378  env=self._run_env)
379  logger.debug("IP Secondary {} has been started: {}".format(self.id, self.port))
380  return self
381 
382  def __exit__(self, exc_type, exc_val, exc_tb):
383  self._process.terminate()
384  self._process.wait(timeout=60)
385  logger.debug("IP Secondary {} has been stopped".format(self.id))
386 
387  def wait_for_completion(self, timeout=120):
388  self._process.wait(timeout)
389 
390  def emulate_reboot(self):
391  os.remove(self.reboot_sentinel_file)
392 
393 
394 class UptaneRepo(HTTPServer):
395  def __init__(self, doc_root, ifc, port, client_handler_map={}):
396  super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.Handler)
397 
398  self.base_url = 'http://{}:{}'.format(self.server_address[0], self.server_address[1])
399  self.doc_root = doc_root
400  self._server_thread = None
401 
402  self.Handler.do_POST = \
403  lambda request: (self.Handler.handler_map.get('POST', {})).get(request.path,
404  self.Handler.default_handler)(request)
405 
406  self.Handler.do_PUT = \
407  lambda request: (self.Handler.handler_map.get('PUT', {})).get(request.path,
408  self.Handler.default_handler)(request)
409 
410  self.Handler.do_GET = \
411  lambda request: (self.Handler.handler_map.get('GET', {})).get(request.path,
412  self.Handler.default_get)(request)
413 
414  for method, method_handlers in client_handler_map.items():
415  for url, handler in method_handlers.items():
416  if self.Handler.handler_map.get(method, None) is None:
417  self.Handler.handler_map[method] = {}
418  self.Handler.handler_map[method][url] = handler
419 
420  class Handler(SimpleHTTPRequestHandler):
421  def __init__(self, request, client_address, server):
422  self.doc_root = server.doc_root
423  self.disable_nagle_algorithm = True
424  super(UptaneRepo.Handler, self).__init__(request, client_address, server)
425 
426  def default_handler(self):
427  self.send_response(200)
428  self.end_headers()
429 
430  def default_get(self):
431  if not os.path.exists(self.file_path):
432  self.send_response(404)
433  self.end_headers()
434  return
435  self.send_response(200)
436  self.end_headers()
437  with open(self.file_path, 'rb') as source:
438  self.copyfile(source, self.wfile)
439 
440  handler_map = {}
441 
442  @property
443  def file_path(self):
444  return os.path.join(self.doc_root, self.path[1:])
445 
446  def start(self):
447  self._server_thread = threading.Thread(target=self.serve_forever)
448  self._server_thread.start()
449  return self
450 
451  def stop(self):
452  self.shutdown()
453  self.server_close()
454  if self._server_thread:
455  self._server_thread.join(timeout=60)
456  self._server_thread = None
457 
458  def __enter__(self):
459  return self.start()
460 
461  def __exit__(self, exc_type, exc_val, exc_tb):
462  self.stop()
463 
464 
466  """
467  This server
468  - serves signed metadata about images
469  - receives device manifest which includes installation report if any installation has happened
470  """
471 
472  director_subdir = "repo/director"
473 
474  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
475  super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.director_subdir), ifc=ifc, port=port,
476  client_handler_map=client_handler_map)
477 
478  self._manifest = None
479  self._last_install_res = False
480  self._last_install_res_lock = threading.RLock()
481  self._installed_condition = threading.Condition()
482 
484  def handle_manifest(self):
485  self.send_response(200)
486  self.end_headers()
487  json_data = None
488  try:
489  data_size = int(self.headers['Content-Length'])
490  data_string = self.rfile.read(data_size)
491  json_data = json.loads(data_string)
492  except Exception as exc:
493  logger.error(exc)
494 
495  if json_data:
496  install_report = json_data['signed'].get('installation_report', "")
497  if install_report:
498  self.server.set_install_event(json_data)
499 
500  handler_map = {'PUT': {'/manifest': handle_manifest}}
501 
502  def set_install_event(self, manifest):
503  with self._installed_condition:
504  self._manifest = manifest
505  self._last_install_res = manifest['signed']['installation_report']['report']['result']['success']
506  self._installed_condition.notifyAll()
507 
508  def wait_for_install(self, timeout=180):
509  with self._installed_condition:
510  self._installed_condition.wait(timeout=timeout)
511  return self._last_install_res
512 
513  def get_install_result(self):
514  with self._installed_condition:
515  return self._last_install_res
516 
517  def get_manifest(self):
518  with self._installed_condition:
519  return self._manifest
520 
521  def get_ecu_manifest(self, ecu_serial):
522  return self.get_manifest()['signed']['ecu_version_manifests'][ecu_serial]
523 
524  def get_ecu_manifest_filepath(self, ecu_serial):
525  return self.get_ecu_manifest(ecu_serial)['signed']['installed_image']['filepath']
526 
527 
529  """
530  This server serves signed metadata about images
531  as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
532  """
533 
534  image_subdir = "repo/repo"
535 
536  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
537  super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.image_subdir), ifc=ifc, port=port,
538  client_handler_map=client_handler_map)
539 
540 
542  """
543  This server serves images
544  """
545  image_subdir = "repo/repo"
546 
547  def __init__(self, root, ifc, port, client_handler_map={}):
548  super(CustomRepo, self).__init__(os.path.join(root, self.image_subdir),
549  ifc=ifc, port=port, client_handler_map=client_handler_map)
550 
551 
553  """
554  This server serves requests from an OSTree client, i.e. emulates/mocks the treehub server
555  """
556  def __init__(self, ifc, port, client_handler_map={}):
557  self.root = tempfile.mkdtemp()
558  super(Treehub, self).__init__(self.root, ifc=ifc, port=port, client_handler_map=client_handler_map)
559 
560  def __enter__(self):
561  self.revision = create_repo(self.root)
562  return super(Treehub, self).__enter__()
563 
564  def __exit__(self, exc_type, exc_val, exc_tb):
565  super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
566  shutil.rmtree(self.root, ignore_errors=True)
567 
569  def default_get(self):
570  if not os.path.exists(self.file_path):
571  self.send_response(404)
572  self.end_headers()
573  return
574 
575  super(Treehub.Handler, self).default_get()
576 
577 
579  def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
580  self._bytes_to_send_before_interruption = bytes_to_send_before_interruption
581  self._number_of_failures = number_of_failures
582  self._failure_counter = 0
583  self._url = url
584 
585  def __call__(self, request_handler):
586  if self._failure_counter < self._number_of_failures:
587  request_handler.send_response(200)
588  file_size = os.path.getsize(request_handler.file_path)
589  request_handler.send_header('Content-Length', file_size)
590  request_handler.end_headers()
591 
592  with open(request_handler.file_path, 'rb') as source:
593  data = source.read(self._bytes_to_send_before_interruption)
594  request_handler.wfile.write(data)
595 
596  self._failure_counter += 1
597  else:
598  request_handler.default_get()
599 
600  def map(self, url=''):
601  return {'GET': {url if url else self._url: DownloadInterruptionHandler(self._number_of_failures,
603 
604 
606  dummy_filez = (b'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
607  b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
608  b'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
609 
610  def __init__(self, number_of_failures=1, url='', fake_filez=False):
611  self._number_of_failures = number_of_failures
612  self._failure_counter = 0
613  self._url = url
614  self._fake_filez = fake_filez
615 
616  def __call__(self, request_handler):
617  if self._number_of_failures == -1 or self._failure_counter < self._number_of_failures:
618  request_handler.send_response(200)
619  request_handler.end_headers()
620  if self._fake_filez:
621  request_handler.wfile.write(self.dummy_filez)
622  else:
623  request_handler.wfile.write(b'malformed image')
624 
625  self._failure_counter += 1
626  else:
627  request_handler.default_get()
628 
629  def map(self, url):
630  return {'GET': {url if url else self._url: MalformedImageHandler(self._number_of_failures,
631  fake_filez=self._fake_filez)}}
632 
633 
635  def __init__(self, number_of_failures=1, url=''):
636  self._number_of_failures = number_of_failures
637  self._failure_counter = 0
638  self._url = url
639 
640  def __call__(self, request_handler):
641  if self._failure_counter < self._number_of_failures:
642  request_handler.send_response(200)
643  file_size = os.path.getsize(request_handler.file_path)
644  request_handler.end_headers()
645 
646  with open(request_handler.file_path, 'rb') as source:
647  while True:
648  data = source.read(1)
649  if not data:
650  break
651  request_handler.wfile.write(data)
652  request_handler.wfile.flush()
653  import time
654  time.sleep(100)
655 
656  self._failure_counter += 1
657  else:
658  request_handler.default_get()
659 
660  def map(self, url):
661  return {'GET': {url if url else self._url: SlowRetrievalHandler(self._number_of_failures)}}
662 
663 
665  def __init__(self, number_of_redirects=1, url=''):
666  self._number_of_redirects = number_of_redirects
667  self._redirect_counter = 0
668  self._url = url
669 
670  def __call__(self, request_handler):
672  request_handler.send_response(301)
673  request_handler.send_header('Location', request_handler.server.base_url + request_handler.path)
674  request_handler.end_headers()
675  self._redirect_counter += 1
676  else:
677  request_handler.default_get()
678 
679  def map(self, url):
680  return {'GET': {url if url else self._url: RedirectHandler(self._number_of_redirects)}}
681 
682 
684  def __init__(self, number_of_failures=1):
685  self._number_of_failures = number_of_failures
686  self._failure_counter = 0
687 
688  def __call__(self, request_handler):
689  if self._failure_counter < self._number_of_failures:
690  request_handler.send_response(200)
691  request_handler.end_headers()
692  request_handler.wfile.write(b'{"non-uptane-json": "some-value"}')
693 
694  self._failure_counter += 1
695  else:
696  request_handler.default_get()
697 
698  def map(self, url):
699  return {'GET': {url: MalformedJsonHandler(self._number_of_failures)}}
700 
701 
703  def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
704  self.image_rel_dir = 'repo/repo'
705  self.target_rel_dir = 'repo/repo/targets'
706 
707  self._repo_manager_exe = repo_manager_exe
708  self.root_dir = tempfile.mkdtemp()
709 
710  self.server_port = server_port
711  self.director_port = director_port
712  self.image_repo_port = image_repo_port
713  self.custom_repo_port = custom_repo_port
714 
715  def create_generic_server(self, **kwargs):
716  return FakeTestServerBackground(meta_path=self.root_dir, target_path=self.target_dir, port=self.server_port)
717 
718  def create_director_repo(self, handler_map={}):
719  return DirectorRepo(self.root_dir, 'localhost', self.director_port, client_handler_map=handler_map)
720 
721  def create_image_repo(self, handler_map={}):
722  return ImageRepo(self.root_dir, 'localhost', self.image_repo_port, client_handler_map=handler_map)
723 
724  def create_custom_repo(self, handler_map={}):
725  return CustomRepo(self.root_dir, 'localhost', self.custom_repo_port, client_handler_map=handler_map)
726 
727  @property
728  def image_dir(self):
729  return path.join(self.root_dir, self.image_rel_dir)
730 
731  @property
732  def target_dir(self):
733  return path.join(self.root_dir, self.target_rel_dir)
734 
735  @property
736  def target_file(self):
737  return path.join(self.image_dir, 'targets.json')
738 
739  def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
740 
741  targetname = target_name if target_name else image_filename
742 
743  with open(path.join(self.image_dir, image_filename), 'wb') as image_file:
744  image_file.write(urandom(image_size))
745 
746  image_creation_cmdline = [self._repo_manager_exe, '--path', self.root_dir,
747  '--command', 'image', '--filename', image_filename, '--targetname', targetname, '--hwid', id[0]]
748 
749  if custom_url:
750  image_creation_cmdline.append('--url')
751  image_creation_cmdline.append(custom_url)
752 
753  subprocess.run(image_creation_cmdline, cwd=self.image_dir, check=True)
754 
755  # update the director metadata
756  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
757  '--command', 'addtarget', '--targetname', targetname,
758  '--hwid', id[0], '--serial', id[1]], check=True)
759 
760  # sign so the image becomes available for an update for a client/device
761  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
762 
763  with open(self.target_file, "r") as target_file:
764  targets = json.load(target_file)
765  target_hash = targets["signed"]["targets"][targetname]["hashes"]["sha256"]
766 
767  return target_hash
768 
769  def add_ostree_target(self, id, rev_hash, target_name=None, expires_within_sec=(60 * 5)):
770  # emulate the backend behavior on defining a target name for OSTREE target format
771  target_name = rev_hash if target_name is None else "{}-{}".format(target_name, rev_hash)
772  image_creation_cmdline = [self._repo_manager_exe,
773  '--command', 'image',
774  '--path', self.root_dir,
775  '--targetname', target_name,
776  '--targetsha256', rev_hash,
777  '--targetlength', '0',
778  '--targetformat', 'OSTREE',
779  '--hwid', id[0]]
780  subprocess.run(image_creation_cmdline, check=True)
781 
782  expiration_time = time.time() + expires_within_sec
783  expiration_time_str = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(expiration_time))
784 
785  subprocess.run([self._repo_manager_exe,
786  '--command', 'addtarget',
787  '--path', self.root_dir,
788  '--targetname', target_name,
789  '--hwid', id[0],
790  '--serial', id[1],
791  '--expires', expiration_time_str],
792  check=True)
793 
794  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
795 
796  return target_name
797 
798  def __enter__(self):
799  self._generate_repo()
800  return self
801 
802  def __exit__(self, exc_type, exc_val, exc_tb):
803  shutil.rmtree(self.root_dir, ignore_errors=True)
804 
805  def _generate_repo(self):
806  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
807  '--command', 'generate', '--keytype', 'ED25519'], check=True)
808 
809 
810 def with_aktualizr(start=True, output_logs=False, id=('primary-hw-ID-001', str(uuid4())), wait_timeout=60,
811  secondary_wait_sec=600, log_level=1, aktualizr_primary_exe='src/aktualizr_primary/aktualizr',
812  aktualizr_info_exe='src/aktualizr_info/aktualizr-info',
813  run_mode='once'):
814  def decorator(test):
815  @wraps(test)
816  def wrapper(*args, ostree_mock_path=None, **kwargs):
817  aktualizr = Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
818  aktualizr_info_exe=aktualizr_info_exe, id=id,
819  wait_timeout=wait_timeout,
820  secondary_wait_sec=secondary_wait_sec,
821  log_level=log_level, output_logs=output_logs,
822  run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
823  if start:
824  with aktualizr:
825  result = test(*args, **kwargs, aktualizr=aktualizr)
826  else:
827  result = test(*args, **kwargs, aktualizr=aktualizr)
828  return result
829  return wrapper
830  return decorator
831 
832 
833 # The following decorators can be eliminated if pytest framework (or similar) is used
834 # by using fixtures instead
835 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
836  def decorator(test):
837  @wraps(test)
838  def wrapper(*args, **kwargs):
839  repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
840  with UptaneTestRepo(repo_manager_exe_abs_path) as repo:
841  if start_generic_server:
842  with repo.create_generic_server() as uptane_server:
843  result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
844  else:
845  result = test(*args, **kwargs, uptane_repo=repo)
846  return result
847  return wrapper
848  return decorator
849 
850 
851 def with_director(start=True, handlers=[]):
852  def decorator(test):
853  @wraps(test)
854  def wrapper(*args, uptane_repo, **kwargs):
855  def func(handler_map={}):
856  director = uptane_repo.create_director_repo(handler_map=handler_map)
857  if start:
858  with director:
859  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
860  else:
861  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
862  return result
863 
864  if handlers and len(handlers) > 0:
865  for handler in handlers:
866  result = func(handler.map(kwargs.get('test_path', '')))
867  if not result:
868  break
869  else:
870  result = func()
871  return result
872  return wrapper
873  return decorator
874 
875 
876 def with_imagerepo(start=True, handlers=[]):
877  def decorator(test):
878  @wraps(test)
879  def wrapper(*args, uptane_repo, **kwargs):
880  def func(handler_map={}):
881  image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
882  if start:
883  with image_repo:
884  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
885  else:
886  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
887  return result
888 
889  if handlers and len(handlers) > 0:
890  for handler in handlers:
891  result = func(handler.map(kwargs.get('test_path', '')))
892  if not result:
893  break
894  else:
895  result = func()
896  return result
897  return wrapper
898  return decorator
899 
900 
901 def with_secondary(start=True, output_logs=False, id=('secondary-hw-ID-001', None),
902  force_reboot=False, arg_name='secondary',
903  aktualizr_secondary_exe='src/aktualizr_secondary/aktualizr-secondary'):
904  def decorator(test):
905  @wraps(test)
906  def wrapper(*args, **kwargs):
907  id1 = id
908  if id1[1] is None:
909  id1 = (id1[0], str(uuid4()))
910  secondary = IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, output_logs=output_logs, id=id1, force_reboot=force_reboot, **kwargs)
911  sl = kwargs.get("secondaries", []) + [secondary]
912  kwargs.update({arg_name: secondary, "secondaries": sl})
913  if "primary_port" not in kwargs:
914  kwargs["primary_port"] = secondary.primary_port
915  if start:
916  with secondary:
917  result = test(*args, **kwargs)
918  else:
919  result = test(*args, **kwargs)
920  return result
921  return wrapper
922  return decorator
923 
924 
925 def with_path(paths):
926  def decorator(test):
927  @wraps(test)
928  def wrapper(*args, **kwargs):
929  for test_path in paths:
930  result = test(*args, **kwargs, test_path=test_path)
931  if not result:
932  break
933  return result
934  return wrapper
935  return decorator
936 
937 
939  def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
940  self.aktualizr = aktualizr
941  self.images_to_install = []
942  for image in images_to_install:
943  self.images_to_install.append({
944  'ecu_id': image[0],
945  'filename': image[1],
946  'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2] if len(image) > 2 else '')
947  })
948 
949  def are_images_installed(self):
950  result = True
951  for image in self.images_to_install:
952  if not (image['hash'] == self.aktualizr.get_current_image_info(image['ecu_id'])):
953  result = False
954  break
955 
956  return result
957 
958 
959 def with_install_manager(default_images=True):
960  def decorator(test):
961  @wraps(test)
962  def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
963  if default_images and (not images_to_install or len(images_to_install) == 0):
964  images_to_install = [(aktualizr.id, 'primary-image.img')]
965  if secondary:
966  images_to_install.append((secondary.id, 'secondary-image.img'))
967  install_mngr = InstallManager(aktualizr, uptane_repo, images_to_install)
968  result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
969  uptane_repo=uptane_repo, install_mngr=install_mngr)
970  return result
971  return wrapper
972  return decorator
973 
974 
975 def with_images(images_to_install):
976  def decorator(test):
977  @wraps(test)
978  def wrapper(*args, **kwargs):
979  return test(*args, **kwargs, images_to_install=images_to_install)
980  return wrapper
981  return decorator
982 
983 
984 def with_customrepo(start=True, handlers=[]):
985  def decorator(test):
986  @wraps(test)
987  def wrapper(*args, uptane_repo, **kwargs):
988  def func(handler_map={}):
989  custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
990  if start:
991  with custom_repo:
992  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
993  else:
994  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
995  return result
996 
997  if handlers and len(handlers) > 0:
998  for handler in handlers:
999  result = func(handler.map(kwargs.get('test_path', '')))
1000  if not result:
1001  break
1002  else:
1003  result = func()
1004  return result
1005  return wrapper
1006  return decorator
1007 
1008 
1009 class Sysroot:
1010  repo_path = 'ostree_repo'
1011 
1012  def __enter__(self):
1013  self._root = tempfile.mkdtemp()
1014  self.path = os.path.join(self._root, self.repo_path)
1015  self.version_file = os.path.join(self.path, 'version')
1016 
1017  subprocess.run(['cp', '-r', self.repo_path, self._root], check=True)
1018 
1019  initial_revision = self.get_revision()
1020  with open(self.version_file, 'wt') as version_file:
1021  version_file.writelines(['{}\n'.format(initial_revision),
1022  '{}\n'.format('0'),
1023  '{}\n'.format(initial_revision),
1024  '{}\n'.format('0')])
1025 
1026  return self
1027 
1028  def __exit__(self, exc_type, exc_val, exc_tb):
1029  shutil.rmtree(self._root, ignore_errors=True)
1030 
1031  def get_revision(self):
1032  rev_cmd_res = subprocess.run(['ostree', 'rev-parse', '--repo', self.path + '/ostree/repo', 'generate-remote:generated'],
1033  timeout=60, check=True, stdout=subprocess.PIPE)
1034 
1035  return rev_cmd_res.stdout.decode('ascii').rstrip('\n')
1036 
1037  def update_revision(self, rev):
1038  with open(self.version_file, 'wt') as version_file:
1039  version_file.writelines(['{}\n'.format(rev),
1040  '{}\n'.format('1'),
1041  '{}\n'.format(rev),
1042  '{}\n'.format('1')])
1043 
1044 
1045 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
1046  def decorator(test):
1047  @wraps(test)
1048  def wrapper(*args, **kwargs):
1049  with Sysroot() as sysroot:
1050  return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1051  return wrapper
1052  return decorator
1053 
1054 
1055 def with_treehub(handlers=[], port=0):
1056  def decorator(test):
1057  @wraps(test)
1058  def wrapper(*args, **kwargs):
1059  def func(handler_map={}):
1060  with Treehub('localhost', port=port, client_handler_map=handler_map) as treehub:
1061  return test(*args, **kwargs, treehub=treehub)
1062 
1063  if handlers and len(handlers) > 0:
1064  for handler in handlers:
1065  result = func(handler.map(kwargs.get('test_path', '')))
1066  if not result:
1067  break
1068  else:
1069  result = func()
1070  return result
1071  return wrapper
1072  return decorator
1073 
1074 
1075 class NonDaemonPool(pool.Pool):
1076  def Process(self, *args, **kwds):
1077  proc = super(NonDaemonPool, self).Process(*args, **kwds)
1078 
1079  class NonDaemonProcess(proc.__class__):
1080  """Monkey-patch process to ensure it is never daemonized"""
1081 
1082  @property
1083  def daemon(self):
1084  return False
1085 
1086  @daemon.setter
1087  def daemon(self, val):
1088  pass
1089 
1090  proc.__class__ = NonDaemonProcess
1091 
1092  return proc
1093 
1094 
1096  def __init__(self, tests):
1097  self._tests = tests
1098  self._test_runner_pool = NonDaemonPool(min(len(self._tests), cpu_count()))
1099 
1100  def __enter__(self):
1101  return self
1102 
1103  def __exit__(self, exc_type, exc_value, traceback):
1104  # This must be called, see https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
1105  self._test_runner_pool.__exit__(exc_type, exc_value, traceback)
1106 
1107  @staticmethod
1108  def test_runner(test):
1109  logger.info('>>> Running {}...'.format(test.__name__))
1110  test_run_result = test()
1111  logger.info('>>> {}: {}\n'.format('OK' if test_run_result else 'FAILED', test.__name__))
1112  return test_run_result
1113 
1114  def run(self):
1115  results = self._test_runner_pool.map(TestRunner.test_runner, self._tests)
1116  total_result = True
1117  for result in results:
1118  total_result = total_result and result
1119  return total_result
def get_info(self, retry=30)
def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:')
def add_secondary(self, secondary)
def get_current_primary_image_info(self)