Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
test_fixtures.py
1 import subprocess
2 import logging
3 import json
4 import tempfile
5 import threading
6 import os
7 import shutil
8 import signal
9 import socket
10 import time
11 
12 from os import path
13 from uuid import uuid4
14 from os import urandom
15 from functools import wraps
16 from multiprocessing import pool, cpu_count
17 from http.server import SimpleHTTPRequestHandler, HTTPServer
18 
19 from fake_http_server.fake_test_server import FakeTestServerBackground
20 from sota_tools.treehub_server import create_repo
21 
22 
23 logger = logging.getLogger(__name__)
24 
25 
26 class Aktualizr:
27 
28  def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29  uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30  primary_port=None, secondaries=None, secondary_wait_sec=600, output_logs=True,
31  run_mode='once', director=None, image_repo=None,
32  sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
33  self.id = id
34  self._aktualizr_primary_exe = aktualizr_primary_exe
35  self._aktualizr_info_exe = aktualizr_info_exe
36  self._storage_dir = tempfile.TemporaryDirectory()
37  self._log_level = log_level
38  self._sentinel_file = 'need_reboot'
39  self.reboot_sentinel_file = os.path.join(self._storage_dir.name, self._sentinel_file)
40  self._import_dir = os.path.join(self._storage_dir.name, 'import')
41  KeyStore.copy_keys(self._import_dir)
42 
43  with open(path.join(self._storage_dir.name, 'secondary_config.json'), 'w+') as secondary_config_file:
44  secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45  format(port=primary_port if primary_port else wait_port,
46  timeout=wait_timeout))
47  json.dump(secondary_cfg, secondary_config_file)
48  self._secondary_config_file = secondary_config_file.name
49  self._secondary_wait_sec = secondary_wait_sec
50 
51  with open(path.join(self._storage_dir.name, 'config.toml'), 'w+') as config_file:
52  config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
53  import_path=self._import_dir,
54  serial=id[1], hw_ID=id[0],
55  storage_dir=self._storage_dir.name,
56  db_path=path.join(self._storage_dir.name, 'sql.db'),
57  log_level=self._log_level,
58  secondary_cfg_file=self._secondary_config_file,
59  secondary_wait_sec=self._secondary_wait_sec,
60  director=director.base_url if director else '',
61  image_repo=image_repo.base_url if image_repo else '',
62  pacman_type='ostree' if treehub and sysroot else 'none',
63  ostree_sysroot=sysroot.path if sysroot else '',
64  treehub_server=treehub.base_url if treehub else '',
65  sentinel_dir=self._storage_dir.name,
66  sentinel_name=self._sentinel_file))
67  self._config_file = config_file.name
68 
69  if secondaries is not None:
70  for s in secondaries:
71  self.add_secondary(s)
72  self._output_logs = output_logs
73  self._run_mode = run_mode
74  self._run_env = {}
75  if sysroot and ostree_mock_path:
76  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
77  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
78 
79  CONFIG_TEMPLATE = '''
80  [tls]
81  server = "{server_url}"
82 
83  [import]
84  base_path = "{import_path}"
85  tls_cacert_path = "ca.pem"
86  tls_pkey_path = "pkey.pem"
87  tls_clientcert_path = "client.pem"
88 
89  [provision]
90  primary_ecu_serial = "{serial}"
91  primary_ecu_hardware_id = "{hw_ID}"
92 
93  [storage]
94  path = "{storage_dir}"
95  type = "sqlite"
96  sqldb_path = "{db_path}"
97 
98  [pacman]
99  type = "{pacman_type}"
100  sysroot = "{ostree_sysroot}"
101  ostree_server = "{treehub_server}"
102  os = "dummy-os"
103 
104  [uptane]
105  polling_sec = 0
106  secondary_config_file = "{secondary_cfg_file}"
107  secondary_preinstall_wait_sec = {secondary_wait_sec}
108  director_server = "{director}"
109  repo_server = "{image_repo}"
110 
111  [bootloader]
112  reboot_sentinel_dir = "{sentinel_dir}"
113  reboot_sentinel_name = "{sentinel_name}"
114  reboot_command = ""
115 
116  [logger]
117  loglevel = {log_level}
118 
119  '''
120 
121  SECONDARY_CONFIG_TEMPLATE = '''
122  {{
123  "IP": {{
124  "secondaries_wait_port": {port},
125  "secondaries_wait_timeout": {timeout},
126  "secondaries": []
127  }}
128  }}
129  '''
130 
131  def add_secondary(self, secondary):
132  with open(self._secondary_config_file, "r+") as config_file:
133  sec_cfg = json.load(config_file)
134  sec_cfg["IP"]["secondaries"].append({"addr": "127.0.0.1:{}".format(secondary.port)})
135  config_file.seek(0)
136  json.dump(sec_cfg, config_file)
137 
138  def update_wait_timeout(self, timeout):
139  with open(self._secondary_config_file, "r+") as config_file:
140  sec_cfg = json.load(config_file)
141  sec_cfg["IP"]["secondaries_wait_timeout"] = timeout
142  config_file.seek(0)
143  json.dump(sec_cfg, config_file)
144 
145  def run(self, run_mode):
146  subprocess.run([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', run_mode],
147  check=True, env=self._run_env)
148 
149  def get_info(self, retry=15):
150  info_exe_res = None
151  for ii in range(0, retry):
152  info_exe_res = subprocess.run([self._aktualizr_info_exe, '-c', self._config_file],
153  timeout=60, stdout=subprocess.PIPE, env=self._run_env)
154  if info_exe_res.returncode == 0 and \
155  str(info_exe_res.stdout).find('no details about installed nor pending images') != -1:
156  break
157 
158  if info_exe_res and info_exe_res.returncode == 0:
159  return str(info_exe_res.stdout)
160  else:
161  logger.error(str(info_exe_res.stderr))
162  return None
163 
164  # ugly stuff that could be removed if Aktualizr had exposed API to check status
165  # or aktualizr-info had output status/info in a structured way (e.g. json)
166  def is_ecu_registered(self, ecu_id):
167  device_status = self.get_info()
168  if not ((device_status.find(ecu_id[0]) != -1) and (device_status.find(ecu_id[1]) != -1)):
169  return False
170  not_registered_field = "Removed or not registered ecus:"
171  not_reg_start = device_status.find(not_registered_field)
172  return not_reg_start == -1 or (device_status.find(ecu_id[1], not_reg_start) == -1)
173 
174  def get_current_image_info(self, ecu_id):
175  if self.id == ecu_id:
176  return self.get_current_primary_image_info()
177  else:
178  return self._get_current_image_info(ecu_id)
179 
180  def get_current_pending_image_info(self, ecu_id):
181  return self._get_current_image_info(ecu_id, secondary_image_hash_field='pending image hash: ')
182 
183  # applicable only to secondary ECUs due to inconsistency in presenting information
184  # about primary and secondary ECUs
185  # ugly stuff that could be removed if Aktualizr had exposed API to check status
186  # or aktializr-info had output status/info in a structured way (e.g. json)
187  def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash: '):
188  #secondary_image_filename_field = 'installed image filename: '
189  aktualizr_status = self.get_info()
190  ecu_serial = ecu_id[1]
191  ecu_info_position = aktualizr_status.find(ecu_serial)
192  if ecu_info_position == -1:
193  return None
194 
195  start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
196  end = aktualizr_status.find('\\n', start)
197  hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
198 
199  # start = aktualizr_status.find(secondary_image_filename_field, ecu_info_position)
200  # end = aktualizr_status.find('\\n', start)
201  # filename_val = aktualizr_status[start + len(secondary_image_filename_field):end]
202 
203  return hash_val
204 
205  # ugly stuff that could be removed if Aktualizr had exposed API to check status
206  # or aktializr-info had output status/info in a structured way (e.g. json)
207  def get_current_primary_image_info(self):
208  primary_hash_field = 'Current primary ecu running version: '
209  aktualizr_status = self.get_info()
210  if aktualizr_status:
211  start = aktualizr_status.find(primary_hash_field)
212  end = aktualizr_status.find('\\n', start)
213  return aktualizr_status[start + len(primary_hash_field):end]
214  else:
215  logger.error("Failed to get aktualizr info/status")
216  return ""
217 
218  # ugly stuff that could be removed if Aktualizr had exposed API to check status
219  # or aktializr-info had output status/info in a structured way (e.g. json)
220  def get_primary_pending_version(self):
221  primary_hash_field = 'Pending primary ecu version: '
222  aktualizr_status = self.get_info()
223  start = aktualizr_status.find(primary_hash_field)
224  end = aktualizr_status.find('\\n', start)
225  return aktualizr_status[start + len(primary_hash_field):end]
226 
227  def __enter__(self):
228  self._process = subprocess.Popen([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', self._run_mode],
229  stdout=None if self._output_logs else subprocess.PIPE,
230  stderr=None if self._output_logs else subprocess.STDOUT,
231  close_fds=True,
232  env=self._run_env)
233  logger.debug("Aktualizr has been started")
234  return self
235 
236  def __exit__(self, exc_type, exc_val, exc_tb):
237  self._process.terminate()
238  self._process.wait(timeout=60)
239  logger.debug("Aktualizr has been stopped")
240 
241  def terminate(self, sig=signal.SIGTERM):
242  self._process.send_signal(sig)
243 
244  def output(self):
245  return self._process.stdout.read().decode(errors='replace')
246 
247  def wait_for_completion(self, timeout=120):
248  self._process.wait(timeout)
249 
250  def wait_for_provision(self, timeout=60):
251  deadline = time.time() + timeout
252  while timeout == 0 or time.time() < deadline:
253  info = self.get_info(retry=1)
254  if info is not None and 'Provisioned on server: yes' in info:
255  return
256  time.sleep(0.2)
257  raise TimeoutError
258 
259  def emulate_reboot(self):
260  os.remove(self.reboot_sentinel_file)
261 
262 
263 class KeyStore:
264  base_dir = "./"
265 
266  @staticmethod
267  def copy_keys(dest_path):
268  os.mkdir(dest_path)
269  shutil.copy(KeyStore.ca(), dest_path)
270  shutil.copy(KeyStore.pkey(), dest_path)
271  shutil.copy(KeyStore.cert(), dest_path)
272 
273  @staticmethod
274  def ca():
275  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/ca.pem')
276 
277  @staticmethod
278  def pkey():
279  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/pkey.pem')
280 
281  @staticmethod
282  def cert():
283  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/client.pem')
284 
285 
287 
288  def __init__(self, aktualizr_secondary_exe, id, port=None, primary_port=None,
289  sysroot=None, treehub=None, output_logs=True, ostree_mock_path=None, **kwargs):
290  self.id = id
291 
292  self._aktualizr_secondary_exe = aktualizr_secondary_exe
293  self.storage_dir = tempfile.TemporaryDirectory()
294  self.port = self.get_free_port() if port is None else port
295  self.primary_port = self.get_free_port() if primary_port is None else primary_port
296  self._sentinel_file = 'need_reboot'
297  self._output_logs = output_logs
298  self.reboot_sentinel_file = os.path.join(self.storage_dir.name, self._sentinel_file)
299 
300  with open(path.join(self.storage_dir.name, 'config.toml'), 'w+') as config_file:
301  config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
302  port=self.port, primary_port=self.primary_port,
303  storage_dir=self.storage_dir.name,
304  db_path=path.join(self.storage_dir.name, 'db.sql'),
305  pacman_type='ostree' if treehub and sysroot else 'none',
306  ostree_sysroot=sysroot.path if sysroot else '',
307  treehub_server=treehub.base_url if treehub else '',
308  sentinel_dir=self.storage_dir.name,
309  sentinel_name=self._sentinel_file
310  ))
311  self._config_file = config_file.name
312 
313  self._run_env = {}
314  if sysroot and ostree_mock_path:
315  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
316  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
317 
318 
319  CONFIG_TEMPLATE = '''
320  [uptane]
321  ecu_serial = "{serial}"
322  ecu_hardware_id = "{hw_ID}"
323 
324  [network]
325  port = {port}
326  primary_ip = "127.0.0.1"
327  primary_port = {primary_port}
328 
329  [storage]
330  type = "sqlite"
331  path = "{storage_dir}"
332  sqldb_path = "{db_path}"
333 
334  [pacman]
335  type = "{pacman_type}"
336  sysroot = "{ostree_sysroot}"
337  ostree_server = "{treehub_server}"
338  os = "dummy-os"
339 
340  [bootloader]
341  reboot_sentinel_dir = "{sentinel_dir}"
342  reboot_sentinel_name = "{sentinel_name}"
343  reboot_command = ""
344  '''
345 
346  def is_running(self):
347  return True if self._process.poll() is None else False
348 
349  @staticmethod
350  def get_free_port():
351  tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
352  tcp.bind(('', 0))
353  port = tcp.getsockname()[1]
354  tcp.close()
355  return port
356 
357  def __enter__(self):
358  self._process = subprocess.Popen([self._aktualizr_secondary_exe, '-c', self._config_file],
359  stdout=None if self._output_logs else subprocess.PIPE,
360  stderr=None if self._output_logs else subprocess.STDOUT,
361  close_fds=True,
362  env=self._run_env)
363  logger.debug("IP Secondary {} has been started: {}".format(self.id, self.port))
364  return self
365 
366  def __exit__(self, exc_type, exc_val, exc_tb):
367  self._process.terminate()
368  self._process.wait(timeout=60)
369  logger.debug("IP Secondary {} has been stopped".format(self.id))
370 
371  def emulate_reboot(self):
372  os.remove(self.reboot_sentinel_file)
373 
374 
375 class UptaneRepo(HTTPServer):
376  def __init__(self, doc_root, ifc, port, client_handler_map={}):
377  super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.Handler)
378 
379  self.base_url = 'http://{}:{}'.format(self.server_address[0], self.server_address[1])
380  self.doc_root = doc_root
381  self._server_thread = None
382 
383  self.Handler.do_POST = \
384  lambda request: (self.Handler.handler_map.get('POST', {})).get(request.path,
385  self.Handler.default_handler)(request)
386 
387  self.Handler.do_PUT = \
388  lambda request: (self.Handler.handler_map.get('PUT', {})).get(request.path,
389  self.Handler.default_handler)(request)
390 
391  self.Handler.do_GET = \
392  lambda request: (self.Handler.handler_map.get('GET', {})).get(request.path,
393  self.Handler.default_get)(request)
394 
395  for method, method_handlers in client_handler_map.items():
396  for url, handler in method_handlers.items():
397  if self.Handler.handler_map.get(method, None) is None:
398  self.Handler.handler_map[method] = {}
399  self.Handler.handler_map[method][url] = handler
400 
401  class Handler(SimpleHTTPRequestHandler):
402  def __init__(self, request, client_address, server):
403  self.doc_root = server.doc_root
404  self.disable_nagle_algorithm = True
405  super(UptaneRepo.Handler, self).__init__(request, client_address, server)
406 
407  def default_handler(self):
408  self.send_response(200)
409  self.end_headers()
410 
411  def default_get(self):
412  if not os.path.exists(self.file_path):
413  self.send_response(404)
414  self.end_headers()
415  return
416  self.send_response(200)
417  self.end_headers()
418  with open(self.file_path, 'rb') as source:
419  self.copyfile(source, self.wfile)
420 
421  handler_map = {}
422 
423  @property
424  def file_path(self):
425  return os.path.join(self.doc_root, self.path[1:])
426 
427  def start(self):
428  self._server_thread = threading.Thread(target=self.serve_forever)
429  self._server_thread.start()
430  return self
431 
432  def stop(self):
433  self.shutdown()
434  self.server_close()
435  if self._server_thread:
436  self._server_thread.join(timeout=60)
437  self._server_thread = None
438 
439  def __enter__(self):
440  return self.start()
441 
442  def __exit__(self, exc_type, exc_val, exc_tb):
443  self.stop()
444 
445 
447  """
448  This server
449  - serves signed metadata about images
450  - receives device manifest which includes installation report if any installation has happened
451  """
452 
453  director_subdir = "repo/director"
454 
455  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
456  super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.director_subdir), ifc=ifc, port=port,
457  client_handler_map=client_handler_map)
458 
459  self._manifest = None
460  self._last_install_res = False
461  self._last_install_res_lock = threading.RLock()
462  self._installed_condition = threading.Condition()
463 
465  def handle_manifest(self):
466  self.send_response(200)
467  self.end_headers()
468  json_data = None
469  try:
470  data_size = int(self.headers['Content-Length'])
471  data_string = self.rfile.read(data_size)
472  json_data = json.loads(data_string)
473  except Exception as exc:
474  logger.error(exc)
475 
476  if json_data:
477  install_report = json_data['signed'].get('installation_report', "")
478  if install_report:
479  self.server.set_install_event(json_data)
480 
481  handler_map = {'PUT': {'/manifest': handle_manifest}}
482 
483  def set_install_event(self, manifest):
484  with self._installed_condition:
485  self._manifest = manifest
486  self._last_install_res = manifest['signed']['installation_report']['report']['result']['success']
487  self._installed_condition.notifyAll()
488 
489  def wait_for_install(self, timeout=180):
490  with self._installed_condition:
491  self._installed_condition.wait(timeout=timeout)
492  return self._last_install_res
493 
494  def get_install_result(self):
495  with self._installed_condition:
496  return self._last_install_res
497 
498  def get_manifest(self):
499  with self._installed_condition:
500  return self._manifest
501 
502  def get_ecu_manifest(self, ecu_serial):
503  return self.get_manifest()['signed']['ecu_version_manifests'][ecu_serial]
504 
505  def get_ecu_manifest_filepath(self, ecu_serial):
506  return self.get_ecu_manifest(ecu_serial)['signed']['installed_image']['filepath']
507 
508 
510  """
511  This server serves signed metadata about images
512  as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
513  """
514 
515  image_subdir = "repo/repo"
516 
517  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
518  super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.image_subdir), ifc=ifc, port=port,
519  client_handler_map=client_handler_map)
520 
521 
523  """
524  This server serves images
525  """
526  image_subdir = "repo/repo"
527 
528  def __init__(self, root, ifc, port, client_handler_map={}):
529  super(CustomRepo, self).__init__(os.path.join(root, self.image_subdir),
530  ifc=ifc, port=port, client_handler_map=client_handler_map)
531 
532 
534  """
535  This server serves requests from an ostree client, i.e. emulates/mocks the treehub server
536  """
537  def __init__(self, ifc, port, client_handler_map={}):
538  self.root = tempfile.mkdtemp()
539  super(Treehub, self).__init__(self.root, ifc=ifc, port=port, client_handler_map=client_handler_map)
540 
541  def __enter__(self):
542  self.revision = create_repo(self.root)
543  return super(Treehub, self).__enter__()
544 
545  def __exit__(self, exc_type, exc_val, exc_tb):
546  super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
547  shutil.rmtree(self.root, ignore_errors=True)
548 
550  def default_get(self):
551  if not os.path.exists(self.file_path):
552  self.send_response(404)
553  self.end_headers()
554  return
555 
556  super(Treehub.Handler, self).default_get()
557 
558 
560  def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
561  self._bytes_to_send_before_interruption = bytes_to_send_before_interruption
562  self._number_of_failures = number_of_failures
563  self._failure_counter = 0
564  self._url = url
565 
566  def __call__(self, request_handler):
567  if self._failure_counter < self._number_of_failures:
568  request_handler.send_response(200)
569  file_size = os.path.getsize(request_handler.file_path)
570  request_handler.send_header('Content-Length', file_size)
571  request_handler.end_headers()
572 
573  with open(request_handler.file_path, 'rb') as source:
574  data = source.read(self._bytes_to_send_before_interruption)
575  request_handler.wfile.write(data)
576 
577  self._failure_counter += 1
578  else:
579  request_handler.default_get()
580 
581  def map(self, url=''):
582  return {'GET': {url if url else self._url: DownloadInterruptionHandler(self._number_of_failures,
584 
585 
587  dummy_filez = (b'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
588  b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
589  b'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
590 
591  def __init__(self, number_of_failures=1, url='', fake_filez=False):
592  self._number_of_failures = number_of_failures
593  self._failure_counter = 0
594  self._url = url
595  self._fake_filez = fake_filez
596 
597  def __call__(self, request_handler):
598  if self._number_of_failures == -1 or self._failure_counter < self._number_of_failures:
599  request_handler.send_response(200)
600  request_handler.end_headers()
601  if self._fake_filez:
602  request_handler.wfile.write(self.dummy_filez)
603  else:
604  request_handler.wfile.write(b'malformed image')
605 
606  self._failure_counter += 1
607  else:
608  request_handler.default_get()
609 
610  def map(self, url):
611  return {'GET': {url if url else self._url: MalformedImageHandler(self._number_of_failures,
612  fake_filez=self._fake_filez)}}
613 
614 
616  def __init__(self, number_of_failures=1, url=''):
617  self._number_of_failures = number_of_failures
618  self._failure_counter = 0
619  self._url = url
620 
621  def __call__(self, request_handler):
622  if self._failure_counter < self._number_of_failures:
623  request_handler.send_response(200)
624  file_size = os.path.getsize(request_handler.file_path)
625  request_handler.end_headers()
626 
627  with open(request_handler.file_path, 'rb') as source:
628  while True:
629  data = source.read(1)
630  if not data:
631  break
632  request_handler.wfile.write(data)
633  request_handler.wfile.flush()
634  import time
635  time.sleep(100)
636 
637  self._failure_counter += 1
638  else:
639  request_handler.default_get()
640 
641  def map(self, url):
642  return {'GET': {url if url else self._url: SlowRetrievalHandler(self._number_of_failures)}}
643 
644 
646  def __init__(self, number_of_redirects=1, url=''):
647  self._number_of_redirects = number_of_redirects
648  self._redirect_counter = 0
649  self._url = url
650 
651  def __call__(self, request_handler):
653  request_handler.send_response(301)
654  request_handler.send_header('Location', request_handler.server.base_url + request_handler.path)
655  request_handler.end_headers()
656  self._redirect_counter += 1
657  else:
658  request_handler.default_get()
659 
660  def map(self, url):
661  return {'GET': {url if url else self._url: RedirectHandler(self._number_of_redirects)}}
662 
663 
665  def __init__(self, number_of_failures=1):
666  self._number_of_failures = number_of_failures
667  self._failure_counter = 0
668 
669  def __call__(self, request_handler):
670  if self._failure_counter < self._number_of_failures:
671  request_handler.send_response(200)
672  request_handler.end_headers()
673  request_handler.wfile.write(b'{"non-uptane-json": "some-value"}')
674 
675  self._failure_counter += 1
676  else:
677  request_handler.default_get()
678 
679  def map(self, url):
680  return {'GET': {url: MalformedJsonHandler(self._number_of_failures)}}
681 
682 
684  def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
685  self.image_rel_dir = 'repo/repo'
686  self.target_rel_dir = 'repo/repo/targets'
687 
688  self._repo_manager_exe = repo_manager_exe
689  self.root_dir = tempfile.mkdtemp()
690 
691  self.server_port = server_port
692  self.director_port = director_port
693  self.image_repo_port = image_repo_port
694  self.custom_repo_port = custom_repo_port
695 
696  def create_generic_server(self, **kwargs):
697  return FakeTestServerBackground(meta_path=self.root_dir, target_path=self.target_dir, port=self.server_port)
698 
699  def create_director_repo(self, handler_map={}):
700  return DirectorRepo(self.root_dir, 'localhost', self.director_port, client_handler_map=handler_map)
701 
702  def create_image_repo(self, handler_map={}):
703  return ImageRepo(self.root_dir, 'localhost', self.image_repo_port, client_handler_map=handler_map)
704 
705  def create_custom_repo(self, handler_map={}):
706  return CustomRepo(self.root_dir, 'localhost', self.custom_repo_port, client_handler_map=handler_map)
707 
708  @property
709  def image_dir(self):
710  return path.join(self.root_dir, self.image_rel_dir)
711 
712  @property
713  def target_dir(self):
714  return path.join(self.root_dir, self.target_rel_dir)
715 
716  @property
717  def target_file(self):
718  return path.join(self.image_dir, 'targets.json')
719 
720  def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
721 
722  targetname = target_name if target_name else image_filename
723 
724  with open(path.join(self.image_dir, image_filename), 'wb') as image_file:
725  image_file.write(urandom(image_size))
726 
727  image_creation_cmdline = [self._repo_manager_exe, '--path', self.root_dir,
728  '--command', 'image', '--filename', image_filename, '--targetname', targetname, '--hwid', id[0]]
729 
730  if custom_url:
731  image_creation_cmdline.append('--url')
732  image_creation_cmdline.append(custom_url)
733 
734  subprocess.run(image_creation_cmdline, cwd=self.image_dir, check=True)
735 
736  # update the director metadata
737  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
738  '--command', 'addtarget', '--targetname', targetname,
739  '--hwid', id[0], '--serial', id[1]], check=True)
740 
741  # sign so the image becomes available for an update for a client/device
742  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
743 
744  with open(self.target_file, "r") as target_file:
745  targets = json.load(target_file)
746  target_hash = targets["signed"]["targets"][targetname]["hashes"]["sha256"]
747 
748  return target_hash
749 
750  def add_ostree_target(self, id, rev_hash, target_name=None, expires_within_sec=(60 * 5)):
751  # emulate the backend behavior on defining a target name for OSTREE target format
752  target_name = rev_hash if target_name is None else "{}-{}".format(target_name, rev_hash)
753  image_creation_cmdline = [self._repo_manager_exe,
754  '--command', 'image',
755  '--path', self.root_dir,
756  '--targetname', target_name,
757  '--targetsha256', rev_hash,
758  '--targetlength', '0',
759  '--targetformat', 'OSTREE',
760  '--hwid', id[0]]
761  subprocess.run(image_creation_cmdline, check=True)
762 
763  expiration_time = time.time() + expires_within_sec
764  expiration_time_str = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(expiration_time))
765 
766  subprocess.run([self._repo_manager_exe,
767  '--command', 'addtarget',
768  '--path', self.root_dir,
769  '--targetname', target_name,
770  '--hwid', id[0],
771  '--serial', id[1],
772  '--expires', expiration_time_str],
773  check=True)
774 
775  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
776 
777  return target_name
778 
779  def __enter__(self):
780  self._generate_repo()
781  return self
782 
783  def __exit__(self, exc_type, exc_val, exc_tb):
784  shutil.rmtree(self.root_dir, ignore_errors=True)
785 
786  def _generate_repo(self):
787  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
788  '--command', 'generate', '--keytype', 'ED25519'], check=True)
789 
790 
791 def with_aktualizr(start=True, output_logs=False, id=('primary-hw-ID-001', str(uuid4())), wait_timeout=60,
792  secondary_wait_sec=600, log_level=1, aktualizr_primary_exe='src/aktualizr_primary/aktualizr',
793  aktualizr_info_exe='src/aktualizr_info/aktualizr-info',
794  run_mode='once'):
795  def decorator(test):
796  @wraps(test)
797  def wrapper(*args, ostree_mock_path=None, **kwargs):
798  aktualizr = Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
799  aktualizr_info_exe=aktualizr_info_exe, id=id,
800  wait_timeout=wait_timeout,
801  secondary_wait_sec=secondary_wait_sec,
802  log_level=log_level, output_logs=output_logs,
803  run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
804  if start:
805  with aktualizr:
806  result = test(*args, **kwargs, aktualizr=aktualizr)
807  else:
808  result = test(*args, **kwargs, aktualizr=aktualizr)
809  return result
810  return wrapper
811  return decorator
812 
813 
814 # The following decorators can be eliminated if pytest framework (or similar) is used
815 # by using fixtures instead
816 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
817  def decorator(test):
818  @wraps(test)
819  def wrapper(*args, **kwargs):
820  repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
821  with UptaneTestRepo(repo_manager_exe_abs_path) as repo:
822  if start_generic_server:
823  with repo.create_generic_server() as uptane_server:
824  result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
825  else:
826  result = test(*args, **kwargs, uptane_repo=repo)
827  return result
828  return wrapper
829  return decorator
830 
831 
832 def with_director(start=True, handlers=[]):
833  def decorator(test):
834  @wraps(test)
835  def wrapper(*args, uptane_repo, **kwargs):
836  def func(handler_map={}):
837  director = uptane_repo.create_director_repo(handler_map=handler_map)
838  if start:
839  with director:
840  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
841  else:
842  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
843  return result
844 
845  if handlers and len(handlers) > 0:
846  for handler in handlers:
847  result = func(handler.map(kwargs.get('test_path', '')))
848  if not result:
849  break
850  else:
851  result = func()
852  return result
853  return wrapper
854  return decorator
855 
856 
857 def with_imagerepo(start=True, handlers=[]):
858  def decorator(test):
859  @wraps(test)
860  def wrapper(*args, uptane_repo, **kwargs):
861  def func(handler_map={}):
862  image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
863  if start:
864  with image_repo:
865  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
866  else:
867  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
868  return result
869 
870  if handlers and len(handlers) > 0:
871  for handler in handlers:
872  result = func(handler.map(kwargs.get('test_path', '')))
873  if not result:
874  break
875  else:
876  result = func()
877  return result
878  return wrapper
879  return decorator
880 
881 
882 def with_secondary(start=True, output_logs=False, id=('secondary-hw-ID-001', None), arg_name='secondary',
883  aktualizr_secondary_exe='src/aktualizr_secondary/aktualizr-secondary'):
884  def decorator(test):
885  @wraps(test)
886  def wrapper(*args, **kwargs):
887  id1 = id
888  if id1[1] is None:
889  id1 = (id1[0], str(uuid4()))
890  secondary = IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, output_logs=output_logs, id=id1, **kwargs)
891  sl = kwargs.get("secondaries", []) + [secondary]
892  kwargs.update({arg_name: secondary, "secondaries": sl})
893  if "primary_port" not in kwargs:
894  kwargs["primary_port"] = secondary.primary_port
895  if start:
896  with secondary:
897  result = test(*args, **kwargs)
898  else:
899  result = test(*args, **kwargs)
900  return result
901  return wrapper
902  return decorator
903 
904 
905 def with_path(paths):
906  def decorator(test):
907  @wraps(test)
908  def wrapper(*args, **kwargs):
909  for test_path in paths:
910  result = test(*args, **kwargs, test_path=test_path)
911  if not result:
912  break
913  return result
914  return wrapper
915  return decorator
916 
917 
919  def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
920  self.aktualizr = aktualizr
921  self.images_to_install = []
922  for image in images_to_install:
923  self.images_to_install.append({
924  'ecu_id': image[0],
925  'filename': image[1],
926  'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2] if len(image) > 2 else '')
927  })
928 
929  def are_images_installed(self):
930  result = True
931  for image in self.images_to_install:
932  if not (image['hash'] == self.aktualizr.get_current_image_info(image['ecu_id'])):
933  result = False
934  break
935 
936  return result
937 
938 
939 def with_install_manager(default_images=True):
940  def decorator(test):
941  @wraps(test)
942  def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
943  if default_images and (not images_to_install or len(images_to_install) == 0):
944  images_to_install = [(aktualizr.id, 'primary-image.img')]
945  if secondary:
946  images_to_install.append((secondary.id, 'secondary-image.img'))
947  install_mngr = InstallManager(aktualizr, uptane_repo, images_to_install)
948  result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
949  uptane_repo=uptane_repo, install_mngr=install_mngr)
950  return result
951  return wrapper
952  return decorator
953 
954 
955 def with_images(images_to_install):
956  def decorator(test):
957  @wraps(test)
958  def wrapper(*args, **kwargs):
959  return test(*args, **kwargs, images_to_install=images_to_install)
960  return wrapper
961  return decorator
962 
963 
964 def with_customrepo(start=True, handlers=[]):
965  def decorator(test):
966  @wraps(test)
967  def wrapper(*args, uptane_repo, **kwargs):
968  def func(handler_map={}):
969  custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
970  if start:
971  with custom_repo:
972  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
973  else:
974  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
975  return result
976 
977  if handlers and len(handlers) > 0:
978  for handler in handlers:
979  result = func(handler.map(kwargs.get('test_path', '')))
980  if not result:
981  break
982  else:
983  result = func()
984  return result
985  return wrapper
986  return decorator
987 
988 
989 class Sysroot:
990  repo_path = 'ostree_repo'
991 
992  def __enter__(self):
993  self._root = tempfile.mkdtemp()
994  self.path = os.path.join(self._root, self.repo_path)
995  self.version_file = os.path.join(self.path, 'version')
996 
997  subprocess.run(['cp', '-r', self.repo_path, self._root], check=True)
998 
999  initial_revision = self.get_revision()
1000  with open(self.version_file, 'wt') as version_file:
1001  version_file.writelines(['{}\n'.format(initial_revision),
1002  '{}\n'.format('0'),
1003  '{}\n'.format(initial_revision),
1004  '{}\n'.format('0')])
1005 
1006  return self
1007 
1008  def __exit__(self, exc_type, exc_val, exc_tb):
1009  shutil.rmtree(self._root, ignore_errors=True)
1010 
1011  def get_revision(self):
1012  rev_cmd_res = subprocess.run(['ostree', 'rev-parse', '--repo', self.path + '/ostree/repo', 'generate-remote:generated'],
1013  timeout=60, check=True, stdout=subprocess.PIPE)
1014 
1015  return rev_cmd_res.stdout.decode('ascii').rstrip('\n')
1016 
1017  def update_revision(self, rev):
1018  with open(self.version_file, 'wt') as version_file:
1019  version_file.writelines(['{}\n'.format(rev),
1020  '{}\n'.format('1'),
1021  '{}\n'.format(rev),
1022  '{}\n'.format('1')])
1023 
1024 
1025 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
1026  def decorator(test):
1027  @wraps(test)
1028  def wrapper(*args, **kwargs):
1029  with Sysroot() as sysroot:
1030  return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1031  return wrapper
1032  return decorator
1033 
1034 
1035 def with_treehub(handlers=[], port=0):
1036  def decorator(test):
1037  @wraps(test)
1038  def wrapper(*args, **kwargs):
1039  def func(handler_map={}):
1040  with Treehub('localhost', port=port, client_handler_map=handler_map) as treehub:
1041  return test(*args, **kwargs, treehub=treehub)
1042 
1043  if handlers and len(handlers) > 0:
1044  for handler in handlers:
1045  result = func(handler.map(kwargs.get('test_path', '')))
1046  if not result:
1047  break
1048  else:
1049  result = func()
1050  return result
1051  return wrapper
1052  return decorator
1053 
1054 
1055 class NonDaemonPool(pool.Pool):
1056  def Process(self, *args, **kwds):
1057  proc = super(NonDaemonPool, self).Process(*args, **kwds)
1058 
1059  class NonDaemonProcess(proc.__class__):
1060  """Monkey-patch process to ensure it is never daemonized"""
1061 
1062  @property
1063  def daemon(self):
1064  return False
1065 
1066  @daemon.setter
1067  def daemon(self, val):
1068  pass
1069 
1070  proc.__class__ = NonDaemonProcess
1071 
1072  return proc
1073 
1074 
1076  def __init__(self, tests):
1077  self._tests = tests
1078  self._test_runner_pool = NonDaemonPool(min(len(self._tests), cpu_count()))
1079 
1080  @staticmethod
1081  def test_runner(test):
1082  logger.info('>>> Running {}...'.format(test.__name__))
1083  test_run_result = test()
1084  logger.info('>>> {}: {}\n'.format('OK' if test_run_result else 'FAILED', test.__name__))
1085  return test_run_result
1086 
1087  def run(self):
1088  results = self._test_runner_pool.map(TestRunner.test_runner, self._tests)
1089  total_result = True
1090  for result in results:
1091  total_result = total_result and result
1092  return total_result
test_fixtures.UptaneTestRepo.target_file
def target_file(self)
Definition: test_fixtures.py:717
test_fixtures.IPSecondary._process
_process
Definition: test_fixtures.py:358
test_fixtures.SlowRetrievalHandler._url
_url
Definition: test_fixtures.py:619
test_fixtures.CustomRepo
Definition: test_fixtures.py:522
test_fixtures.MalformedImageHandler.dummy_filez
tuple dummy_filez
Definition: test_fixtures.py:587
test_fixtures.DirectorRepo._last_install_res_lock
_last_install_res_lock
Definition: test_fixtures.py:461
test_fixtures.CustomRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:526
test_fixtures.IPSecondary.id
id
Definition: test_fixtures.py:289
test_fixtures.UptaneRepo.base_url
base_url
Definition: test_fixtures.py:379
test_fixtures.MalformedImageHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:592
test_fixtures.Aktualizr._storage_dir
_storage_dir
Definition: test_fixtures.py:32
test_fixtures.MalformedJsonHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:666
test_fixtures.Treehub.root
root
Definition: test_fixtures.py:538
test_fixtures.UptaneRepo.doc_root
doc_root
Definition: test_fixtures.py:380
test_fixtures.Sysroot.path
path
Definition: test_fixtures.py:994
test_fixtures.Aktualizr._secondary_wait_sec
_secondary_wait_sec
Definition: test_fixtures.py:45
test_fixtures.Aktualizr._output_logs
_output_logs
Definition: test_fixtures.py:68
test_fixtures.MalformedImageHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:593
test_fixtures.DirectorRepo
Definition: test_fixtures.py:446
test_fixtures.TestRunner._tests
_tests
Definition: test_fixtures.py:1077
test_fixtures.IPSecondary.get_free_port
def get_free_port()
Definition: test_fixtures.py:350
test_fixtures.UptaneRepo._server_thread
_server_thread
Definition: test_fixtures.py:381
test_fixtures.Sysroot._root
_root
Definition: test_fixtures.py:993
test_fixtures.InstallManager.images_to_install
images_to_install
Definition: test_fixtures.py:921
test_fixtures.UptaneTestRepo.root_dir
root_dir
Definition: test_fixtures.py:689
test_fixtures.UptaneRepo.Handler.disable_nagle_algorithm
disable_nagle_algorithm
Definition: test_fixtures.py:404
test_fixtures.Aktualizr.get_current_primary_image_info
def get_current_primary_image_info(self)
Definition: test_fixtures.py:207
test_fixtures.Aktualizr._log_level
_log_level
Definition: test_fixtures.py:33
test_fixtures.Sysroot.repo_path
string repo_path
Definition: test_fixtures.py:990
test_fixtures.UptaneRepo.Handler
Definition: test_fixtures.py:401
test_fixtures.IPSecondary._config_file
_config_file
Definition: test_fixtures.py:310
test_fixtures.UptaneRepo.Handler.file_path
def file_path(self)
Definition: test_fixtures.py:424
test_fixtures.RedirectHandler._url
_url
Definition: test_fixtures.py:649
test_fixtures.UptaneTestRepo
Definition: test_fixtures.py:683
test_fixtures.UptaneTestRepo.image_repo_port
image_repo_port
Definition: test_fixtures.py:693
test_fixtures.RedirectHandler._number_of_redirects
_number_of_redirects
Definition: test_fixtures.py:647
test_fixtures.UptaneTestRepo._repo_manager_exe
_repo_manager_exe
Definition: test_fixtures.py:688
test_fixtures.ImageRepo
Definition: test_fixtures.py:509
test_fixtures.IPSecondary.port
port
Definition: test_fixtures.py:293
test_fixtures.Aktualizr._sentinel_file
_sentinel_file
Definition: test_fixtures.py:34
test_fixtures.ImageRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:515
test_fixtures.IPSecondary.primary_port
primary_port
Definition: test_fixtures.py:294
test_fixtures.InstallManager
Definition: test_fixtures.py:918
test_fixtures.Aktualizr._run_env
_run_env
Definition: test_fixtures.py:70
test_fixtures.IPSecondary
Definition: test_fixtures.py:286
test_fixtures.Treehub
Definition: test_fixtures.py:533
test_fixtures.DirectorRepo._manifest
_manifest
Definition: test_fixtures.py:459
Process
Definition: test_utils.h:19
test_fixtures.UptaneTestRepo.target_dir
def target_dir(self)
Definition: test_fixtures.py:713
test_fixtures.Aktualizr.id
id
Definition: test_fixtures.py:29
test_fixtures.DownloadInterruptionHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:563
test_fixtures.TestRunner._test_runner_pool
_test_runner_pool
Definition: test_fixtures.py:1078
test_fixtures.Aktualizr._process
_process
Definition: test_fixtures.py:228
test_fixtures.UptaneTestRepo.server_port
server_port
Definition: test_fixtures.py:691
test_fixtures.DirectorRepo._last_install_res
_last_install_res
Definition: test_fixtures.py:460
test_fixtures.UptaneTestRepo.target_rel_dir
target_rel_dir
Definition: test_fixtures.py:686
test_fixtures.IPSecondary._sentinel_file
_sentinel_file
Definition: test_fixtures.py:295
test_fixtures.KeyStore
Definition: test_fixtures.py:263
test_fixtures.NonDaemonPool
Definition: test_fixtures.py:1055
test_fixtures.UptaneTestRepo.image_rel_dir
image_rel_dir
Definition: test_fixtures.py:685
test_fixtures.Aktualizr.get_info
def get_info(self, retry=15)
Definition: test_fixtures.py:149
test_fixtures.Aktualizr._import_dir
_import_dir
Definition: test_fixtures.py:36
test_fixtures.UptaneRepo.Handler.doc_root
doc_root
Definition: test_fixtures.py:403
test_fixtures.DownloadInterruptionHandler
Definition: test_fixtures.py:559
test_fixtures.RedirectHandler
Definition: test_fixtures.py:645
test_fixtures.UptaneRepo
Definition: test_fixtures.py:375
test_fixtures.UptaneTestRepo._generate_repo
def _generate_repo(self)
Definition: test_fixtures.py:786
test_fixtures.UptaneTestRepo.custom_repo_port
custom_repo_port
Definition: test_fixtures.py:694
test_fixtures.Aktualizr._aktualizr_primary_exe
_aktualizr_primary_exe
Definition: test_fixtures.py:30
test_fixtures.SlowRetrievalHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:617
test_fixtures.SlowRetrievalHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:618
test_fixtures.IPSecondary.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:297
test_fixtures.DirectorRepo._installed_condition
_installed_condition
Definition: test_fixtures.py:462
test_fixtures.Treehub.Handler
Definition: test_fixtures.py:549
test_fixtures.UptaneTestRepo.director_port
director_port
Definition: test_fixtures.py:692
test_fixtures.IPSecondary._output_logs
_output_logs
Definition: test_fixtures.py:296
test_fixtures.MalformedJsonHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:667
test_fixtures.InstallManager.aktualizr
aktualizr
Definition: test_fixtures.py:920
test_fixtures.MalformedImageHandler._fake_filez
_fake_filez
Definition: test_fixtures.py:595
test_fixtures.DownloadInterruptionHandler._bytes_to_send_before_interruption
_bytes_to_send_before_interruption
Definition: test_fixtures.py:561
test_fixtures.MalformedJsonHandler
Definition: test_fixtures.py:664
test_fixtures.MalformedImageHandler
Definition: test_fixtures.py:586
test_fixtures.DirectorRepo.Handler
Definition: test_fixtures.py:464
test_fixtures.Aktualizr
Definition: test_fixtures.py:26
test_fixtures.IPSecondary._aktualizr_secondary_exe
_aktualizr_secondary_exe
Definition: test_fixtures.py:291
test_fixtures.SlowRetrievalHandler
Definition: test_fixtures.py:615
test_fixtures.MalformedImageHandler._url
_url
Definition: test_fixtures.py:594
test_fixtures.Aktualizr._run_mode
_run_mode
Definition: test_fixtures.py:69
test_fixtures.DirectorRepo.director_subdir
string director_subdir
Definition: test_fixtures.py:453
test_fixtures.DownloadInterruptionHandler._url
_url
Definition: test_fixtures.py:564
test_fixtures.DownloadInterruptionHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:562
test_fixtures.IPSecondary.storage_dir
storage_dir
Definition: test_fixtures.py:292
test_fixtures.Aktualizr._secondary_config_file
_secondary_config_file
Definition: test_fixtures.py:44
test_fixtures.Aktualizr._aktualizr_info_exe
_aktualizr_info_exe
Definition: test_fixtures.py:31
test_fixtures.Aktualizr.add_secondary
def add_secondary(self, secondary)
Definition: test_fixtures.py:131
test_fixtures.Treehub.revision
revision
Definition: test_fixtures.py:542
test_fixtures.Aktualizr._get_current_image_info
def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:')
Definition: test_fixtures.py:187
test_fixtures.RedirectHandler._redirect_counter
_redirect_counter
Definition: test_fixtures.py:648
test_fixtures.Aktualizr._config_file
_config_file
Definition: test_fixtures.py:63
test_fixtures.Sysroot.get_revision
def get_revision(self)
Definition: test_fixtures.py:1011
test_fixtures.IPSecondary._run_env
_run_env
Definition: test_fixtures.py:312
test_fixtures.Aktualizr.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:35
test_fixtures.Sysroot.version_file
version_file
Definition: test_fixtures.py:995
test_fixtures.UptaneTestRepo.image_dir
def image_dir(self)
Definition: test_fixtures.py:709
test_fixtures.TestRunner
Definition: test_fixtures.py:1075
test_fixtures.Sysroot
Definition: test_fixtures.py:989