Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
test_fixtures.py
1 import subprocess
2 import logging
3 import json
4 import tempfile
5 import threading
6 import os
7 import shutil
8 import signal
9 import socket
10 import time
11 
12 from os import path
13 from uuid import uuid4
14 from os import urandom
15 from functools import wraps
16 from multiprocessing import pool, cpu_count
17 from http.server import SimpleHTTPRequestHandler, HTTPServer
18 
19 from fake_http_server.fake_test_server import FakeTestServerBackground
20 from sota_tools.treehub_server import create_repo
21 
22 
23 logger = logging.getLogger(__name__)
24 
25 
26 class Aktualizr:
27 
28  def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29  uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30  primary_port=None, secondaries=None, secondary_wait_sec=600, output_logs=True,
31  run_mode='once', director=None, image_repo=None,
32  sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
33  self.id = id
34  self._aktualizr_primary_exe = aktualizr_primary_exe
35  self._aktualizr_info_exe = aktualizr_info_exe
36  self._storage_dir = tempfile.TemporaryDirectory()
37  self._log_level = log_level
38  self._sentinel_file = 'need_reboot'
39  self.reboot_sentinel_file = os.path.join(self._storage_dir.name, self._sentinel_file)
40  self._import_dir = os.path.join(self._storage_dir.name, 'import')
41  KeyStore.copy_keys(self._import_dir)
42 
43  with open(path.join(self._storage_dir.name, 'secondary_config.json'), 'w+') as secondary_config_file:
44  secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45  format(port=primary_port if primary_port else wait_port,
46  timeout=wait_timeout))
47  json.dump(secondary_cfg, secondary_config_file)
48  self._secondary_config_file = secondary_config_file.name
49  self._secondary_wait_sec = secondary_wait_sec
50 
51  with open(path.join(self._storage_dir.name, 'config.toml'), 'w+') as config_file:
52  config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
53  import_path=self._import_dir,
54  serial=id[1], hw_ID=id[0],
55  storage_dir=self._storage_dir.name,
56  db_path=path.join(self._storage_dir.name, 'sql.db'),
57  log_level=self._log_level,
58  secondary_cfg_file=self._secondary_config_file,
59  secondary_wait_sec=self._secondary_wait_sec,
60  director=director.base_url if director else '',
61  image_repo=image_repo.base_url if image_repo else '',
62  pacman_type='ostree' if treehub and sysroot else 'none',
63  ostree_sysroot=sysroot.path if sysroot else '',
64  treehub_server=treehub.base_url if treehub else '',
65  sentinel_dir=self._storage_dir.name,
66  sentinel_name=self._sentinel_file))
67  self._config_file = config_file.name
68 
69  if secondaries is not None:
70  for s in secondaries:
71  self.add_secondary(s)
72  self._output_logs = output_logs
73  self._run_mode = run_mode
74  self._run_env = {}
75  if sysroot and ostree_mock_path:
76  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
77  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
78 
79  CONFIG_TEMPLATE = '''
80  [tls]
81  server = "{server_url}"
82 
83  [import]
84  base_path = "{import_path}"
85  tls_cacert_path = "ca.pem"
86  tls_pkey_path = "pkey.pem"
87  tls_clientcert_path = "client.pem"
88 
89  [provision]
90  primary_ecu_serial = "{serial}"
91  primary_ecu_hardware_id = "{hw_ID}"
92 
93  [storage]
94  path = "{storage_dir}"
95  type = "sqlite"
96  sqldb_path = "{db_path}"
97 
98  [pacman]
99  type = "{pacman_type}"
100  images_path = "{storage_dir}/images"
101  sysroot = "{ostree_sysroot}"
102  ostree_server = "{treehub_server}"
103  os = "dummy-os"
104 
105  [uptane]
106  polling_sec = 0
107  secondary_config_file = "{secondary_cfg_file}"
108  secondary_preinstall_wait_sec = {secondary_wait_sec}
109  director_server = "{director}"
110  repo_server = "{image_repo}"
111 
112  [bootloader]
113  reboot_sentinel_dir = "{sentinel_dir}"
114  reboot_sentinel_name = "{sentinel_name}"
115  reboot_command = ""
116 
117  [logger]
118  loglevel = {log_level}
119 
120  '''
121 
122  SECONDARY_CONFIG_TEMPLATE = '''
123  {{
124  "IP": {{
125  "secondaries_wait_port": {port},
126  "secondaries_wait_timeout": {timeout},
127  "secondaries": []
128  }}
129  }}
130  '''
131 
132  def set_mode(self, mode):
133  self._run_mode = mode
134 
135  def add_secondary(self, secondary):
136  with open(self._secondary_config_file, "r+") as config_file:
137  sec_cfg = json.load(config_file)
138  sec_cfg["IP"]["secondaries"].append({"addr": "127.0.0.1:{}".format(secondary.port)})
139  config_file.seek(0)
140  json.dump(sec_cfg, config_file)
141 
142  def remove_secondary(self, secondary):
143  with open(self._secondary_config_file, "r+") as config_file:
144  sec_cfg = json.load(config_file)
145  sec_cfg["IP"]["secondaries"].remove({"addr": "127.0.0.1:{}".format(secondary.port)})
146  config_file.seek(0)
147  json.dump(sec_cfg, config_file)
148  config_file.truncate()
149 
150  def update_wait_timeout(self, timeout):
151  with open(self._secondary_config_file, "r+") as config_file:
152  sec_cfg = json.load(config_file)
153  sec_cfg["IP"]["secondaries_wait_timeout"] = timeout
154  config_file.seek(0)
155  json.dump(sec_cfg, config_file)
156 
157  def run(self, run_mode):
158  subprocess.run([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', run_mode],
159  check=True, env=self._run_env)
160 
161  # another ugly stuff that could be replaced with something more reliable if Aktualizr had exposed API
162  # to check status or aktializr-info had output status/info in a structured way (e.g. json)
163  def get_info(self, retry=30):
164  info_exe_res = None
165  for ii in range(0, retry):
166  info_exe_res = subprocess.run([self._aktualizr_info_exe, '-c', self._config_file],
167  timeout=60, stdout=subprocess.PIPE, env=self._run_env)
168  if info_exe_res.returncode == 0 and \
169  str(info_exe_res.stdout).find('Provisioned on server: yes') != -1 and \
170  str(info_exe_res.stdout).find('Current Primary ECU running version:') != -1:
171  break
172 
173  if info_exe_res and info_exe_res.returncode == 0:
174  return str(info_exe_res.stdout)
175  else:
176  logger.error('Failed to get an aktualizr\'s status info, stdout: {}, stderr: {}'.
177  format(str(info_exe_res.stdout), str(info_exe_res.stderr)))
178  return None
179 
180  # ugly stuff that could be removed if Aktualizr had exposed API to check status
181  # or aktualizr-info had output status/info in a structured way (e.g. json)
182  def is_ecu_registered(self, ecu_id):
183  device_status = self.get_info()
184  if not ((device_status.find(ecu_id[0]) != -1) and (device_status.find(ecu_id[1]) != -1)):
185  return False
186  not_registered_field = "Removed or unregistered ECUs (deprecated):"
187  not_reg_start = device_status.find(not_registered_field)
188  return not_reg_start == -1 or (device_status.find(ecu_id[1], not_reg_start) == -1)
189 
190  def get_current_image_info(self, ecu_id):
191  if self.id == ecu_id:
192  return self.get_current_primary_image_info()
193  else:
194  return self._get_current_image_info(ecu_id)
195 
196  def get_current_pending_image_info(self, ecu_id):
197  return self._get_current_image_info(ecu_id, secondary_image_hash_field='pending image hash: ')
198 
199  # applicable only to Secondary ECUs due to inconsistency in presenting information
200  # about Primary and Secondary ECUs
201  # ugly stuff that could be removed if aktualizr had exposed API to check status
202  # or aktializr-info had output status/info in a structured way (e.g. json)
203  def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash: '):
204  #secondary_image_filename_field = 'installed image filename: '
205  aktualizr_status = self.get_info()
206  ecu_serial = ecu_id[1]
207  ecu_info_position = aktualizr_status.find(ecu_serial)
208  if ecu_info_position == -1:
209  return None
210 
211  start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
212  end = aktualizr_status.find('\\n', start)
213  hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
214 
215  # start = aktualizr_status.find(secondary_image_filename_field, ecu_info_position)
216  # end = aktualizr_status.find('\\n', start)
217  # filename_val = aktualizr_status[start + len(secondary_image_filename_field):end]
218 
219  return hash_val
220 
221  # ugly stuff that could be removed if Aktualizr had exposed API to check status
222  # or aktializr-info had output status/info in a structured way (e.g. json)
223  def get_current_primary_image_info(self):
224  primary_hash_field = 'Current Primary ECU running version: '
225  aktualizr_status = self.get_info()
226  if aktualizr_status:
227  start = aktualizr_status.find(primary_hash_field)
228  end = aktualizr_status.find('\\n', start)
229  return aktualizr_status[start + len(primary_hash_field):end]
230  else:
231  logger.error("Failed to get aktualizr info/status")
232  return ""
233 
234  # ugly stuff that could be removed if Aktualizr had exposed API to check status
235  # or aktializr-info had output status/info in a structured way (e.g. json)
236  def get_primary_pending_version(self):
237  primary_hash_field = 'Pending Primary ECU version: '
238  aktualizr_status = self.get_info()
239  start = aktualizr_status.find(primary_hash_field)
240  end = aktualizr_status.find('\\n', start)
241  return aktualizr_status[start + len(primary_hash_field):end]
242 
243  def __enter__(self):
244  self._process = subprocess.Popen([self._aktualizr_primary_exe, '-c', self._config_file, '--run-mode', self._run_mode],
245  stdout=None if self._output_logs else subprocess.PIPE,
246  stderr=None if self._output_logs else subprocess.STDOUT,
247  close_fds=True,
248  env=self._run_env)
249  logger.debug("Aktualizr has been started")
250  return self
251 
252  def __exit__(self, exc_type, exc_val, exc_tb):
253  self._process.terminate()
254  self._process.wait(timeout=60)
255  logger.debug("Aktualizr has been stopped")
256 
257  def terminate(self, sig=signal.SIGTERM):
258  self._process.send_signal(sig)
259 
260  def output(self):
261  return self._process.stdout.read().decode(errors='replace')
262 
263  def wait_for_completion(self, timeout=120):
264  self._process.wait(timeout)
265 
266  def wait_for_provision(self, timeout=60):
267  deadline = time.time() + timeout
268  while timeout == 0 or time.time() < deadline:
269  info = self.get_info(retry=1)
270  if info is not None and 'Provisioned on server: yes' in info:
271  return
272  time.sleep(0.2)
273  raise TimeoutError
274 
275  def emulate_reboot(self):
276  os.remove(self.reboot_sentinel_file)
277 
278 
279 class KeyStore:
280  base_dir = "./"
281 
282  @staticmethod
283  def copy_keys(dest_path):
284  os.mkdir(dest_path)
285  shutil.copy(KeyStore.ca(), dest_path)
286  shutil.copy(KeyStore.pkey(), dest_path)
287  shutil.copy(KeyStore.cert(), dest_path)
288 
289  @staticmethod
290  def ca():
291  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/ca.pem')
292 
293  @staticmethod
294  def pkey():
295  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/pkey.pem')
296 
297  @staticmethod
298  def cert():
299  return path.join(KeyStore.base_dir, 'tests/test_data/prov_testupdate/client.pem')
300 
301 
303 
304  def __init__(self, id, aktualizr_secondary_exe='src/aktualizr_secondary/aktualizr-secondary', port=None, primary_port=None,
305  sysroot=None, treehub=None, output_logs=True, force_reboot=False,
306  ostree_mock_path=None, **kwargs):
307  self.id = id
308 
309  self._aktualizr_secondary_exe = aktualizr_secondary_exe
310  self.storage_dir = tempfile.TemporaryDirectory()
311  self.port = self.get_free_port() if port is None else port
312  self.primary_port = self.get_free_port() if primary_port is None else primary_port
313  self._sentinel_file = 'need_reboot'
314  self._output_logs = output_logs
315  self.reboot_sentinel_file = os.path.join(self.storage_dir.name, self._sentinel_file)
316 
317  if force_reboot:
318  reboot_command = "rm {}".format(self.reboot_sentinel_file)
319  else:
320  reboot_command = ""
321 
322  with open(path.join(self.storage_dir.name, 'config.toml'), 'w+') as config_file:
323  config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
324  force_reboot=1 if force_reboot else 0,
325  reboot_command=reboot_command,
326  port=self.port, primary_port=self.primary_port,
327  storage_dir=self.storage_dir.name,
328  db_path=path.join(self.storage_dir.name, 'db.sql'),
329  pacman_type='ostree' if treehub and sysroot else 'none',
330  ostree_sysroot=sysroot.path if sysroot else '',
331  treehub_server=treehub.base_url if treehub else '',
332  sentinel_dir=self.storage_dir.name,
333  sentinel_name=self._sentinel_file
334  ))
335  self._config_file = config_file.name
336 
337  self._run_env = {}
338  if sysroot and ostree_mock_path:
339  self._run_env['LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
340  self._run_env['OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
341 
342 
343  CONFIG_TEMPLATE = '''
344  [uptane]
345  ecu_serial = "{serial}"
346  ecu_hardware_id = "{hw_ID}"
347  force_install_completion = {force_reboot}
348 
349  [network]
350  port = {port}
351  primary_ip = "127.0.0.1"
352  primary_port = {primary_port}
353 
354  [storage]
355  type = "sqlite"
356  path = "{storage_dir}"
357  sqldb_path = "{db_path}"
358 
359  [pacman]
360  type = "{pacman_type}"
361  sysroot = "{ostree_sysroot}"
362  ostree_server = "{treehub_server}"
363  os = "dummy-os"
364 
365  [bootloader]
366  reboot_sentinel_dir = "{sentinel_dir}"
367  reboot_sentinel_name = "{sentinel_name}"
368  reboot_command = "{reboot_command}"
369  '''
370 
371  def is_running(self):
372  return True if self._process.poll() is None else False
373 
374  @staticmethod
375  def get_free_port():
376  tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
377  tcp.bind(('', 0))
378  port = tcp.getsockname()[1]
379  tcp.close()
380  return port
381 
382  def __enter__(self):
383  self._process = subprocess.Popen([self._aktualizr_secondary_exe, '-c', self._config_file],
384  stdout=None if self._output_logs else subprocess.PIPE,
385  stderr=None if self._output_logs else subprocess.STDOUT,
386  close_fds=True,
387  env=self._run_env)
388  logger.debug("IP Secondary {} has been started: {}".format(self.id, self.port))
389  return self
390 
391  def __exit__(self, exc_type, exc_val, exc_tb):
392  self._process.terminate()
393  self._process.wait(timeout=60)
394  logger.debug("IP Secondary {} has been stopped".format(self.id))
395 
396  def wait_for_completion(self, timeout=120):
397  self._process.wait(timeout)
398 
399  def emulate_reboot(self):
400  os.remove(self.reboot_sentinel_file)
401 
402 
403 class UptaneRepo(HTTPServer):
404  def __init__(self, doc_root, ifc, port, client_handler_map={}):
405  super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.Handler)
406 
407  self.base_url = 'http://{}:{}'.format(self.server_address[0], self.server_address[1])
408  self.doc_root = doc_root
409  self._server_thread = None
410 
411  self.Handler.do_POST = \
412  lambda request: (self.Handler.handler_map.get('POST', {})).get(request.path,
413  self.Handler.default_handler)(request)
414 
415  self.Handler.do_PUT = \
416  lambda request: (self.Handler.handler_map.get('PUT', {})).get(request.path,
417  self.Handler.default_handler)(request)
418 
419  self.Handler.do_GET = \
420  lambda request: (self.Handler.handler_map.get('GET', {})).get(request.path,
421  self.Handler.default_get)(request)
422 
423  for method, method_handlers in client_handler_map.items():
424  for url, handler in method_handlers.items():
425  if self.Handler.handler_map.get(method, None) is None:
426  self.Handler.handler_map[method] = {}
427  self.Handler.handler_map[method][url] = handler
428 
429  class Handler(SimpleHTTPRequestHandler):
430  def __init__(self, request, client_address, server):
431  self.doc_root = server.doc_root
432  self.disable_nagle_algorithm = True
433  super(UptaneRepo.Handler, self).__init__(request, client_address, server)
434 
435  def default_handler(self):
436  self.send_response(200)
437  self.end_headers()
438 
439  def default_get(self):
440  if not os.path.exists(self.file_path):
441  self.send_response(404)
442  self.end_headers()
443  return
444  self.send_response(200)
445  self.end_headers()
446  with open(self.file_path, 'rb') as source:
447  self.copyfile(source, self.wfile)
448 
449  handler_map = {}
450 
451  @property
452  def file_path(self):
453  return os.path.join(self.doc_root, self.path[1:])
454 
455  def start(self):
456  self._server_thread = threading.Thread(target=self.serve_forever)
457  self._server_thread.start()
458  return self
459 
460  def stop(self):
461  self.shutdown()
462  self.server_close()
463  if self._server_thread:
464  self._server_thread.join(timeout=60)
465  self._server_thread = None
466 
467  def __enter__(self):
468  return self.start()
469 
470  def __exit__(self, exc_type, exc_val, exc_tb):
471  self.stop()
472 
473 
475  """
476  This server
477  - serves signed metadata about images
478  - receives device manifest which includes installation report if any installation has happened
479  """
480 
481  director_subdir = "repo/director"
482 
483  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
484  super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.director_subdir), ifc=ifc, port=port,
485  client_handler_map=client_handler_map)
486 
487  self._manifest = None
488  self._last_install_res = False
489  self._last_install_res_lock = threading.RLock()
490  self._installed_condition = threading.Condition()
491 
493  def handle_manifest(self):
494  self.send_response(200)
495  self.end_headers()
496  json_data = None
497  try:
498  data_size = int(self.headers['Content-Length'])
499  data_string = self.rfile.read(data_size)
500  json_data = json.loads(data_string)
501  except Exception as exc:
502  logger.error(exc)
503 
504  if json_data:
505  install_report = json_data['signed'].get('installation_report', "")
506  if install_report:
507  self.server.set_install_event(json_data)
508 
509  handler_map = {'PUT': {'/manifest': handle_manifest}}
510 
511  def set_install_event(self, manifest):
512  with self._installed_condition:
513  self._manifest = manifest
514  self._last_install_res = manifest['signed']['installation_report']['report']['result']['success']
515  self._installed_condition.notifyAll()
516 
517  def wait_for_install(self, timeout=180):
518  with self._installed_condition:
519  self._installed_condition.wait(timeout=timeout)
520  return self._last_install_res
521 
522  def get_install_result(self):
523  with self._installed_condition:
524  return self._last_install_res
525 
526  def get_manifest(self):
527  with self._installed_condition:
528  return self._manifest
529 
530  def get_ecu_manifest(self, ecu_serial):
531  return self.get_manifest()['signed']['ecu_version_manifests'][ecu_serial]
532 
533  def get_ecu_manifest_filepath(self, ecu_serial):
534  return self.get_ecu_manifest(ecu_serial)['signed']['installed_image']['filepath']
535 
536 
538  """
539  This server serves signed metadata about images
540  as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
541  """
542 
543  image_subdir = "repo/repo"
544 
545  def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
546  super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.image_subdir), ifc=ifc, port=port,
547  client_handler_map=client_handler_map)
548 
549 
551  """
552  This server serves images
553  """
554  image_subdir = "repo/repo"
555 
556  def __init__(self, root, ifc, port, client_handler_map={}):
557  super(CustomRepo, self).__init__(os.path.join(root, self.image_subdir),
558  ifc=ifc, port=port, client_handler_map=client_handler_map)
559 
560 
562  """
563  This server serves requests from an OSTree client, i.e. emulates/mocks the treehub server
564  """
565  def __init__(self, ifc, port, client_handler_map={}):
566  self.root = tempfile.mkdtemp()
567  super(Treehub, self).__init__(self.root, ifc=ifc, port=port, client_handler_map=client_handler_map)
568 
569  def __enter__(self):
570  self.revision = create_repo(self.root)
571  return super(Treehub, self).__enter__()
572 
573  def __exit__(self, exc_type, exc_val, exc_tb):
574  super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
575  shutil.rmtree(self.root, ignore_errors=True)
576 
578  def default_get(self):
579  if not os.path.exists(self.file_path):
580  self.send_response(404)
581  self.end_headers()
582  return
583 
584  super(Treehub.Handler, self).default_get()
585 
586 
588  def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
589  self._bytes_to_send_before_interruption = bytes_to_send_before_interruption
590  self._number_of_failures = number_of_failures
591  self._failure_counter = 0
592  self._url = url
593 
594  def __call__(self, request_handler):
595  if self._failure_counter < self._number_of_failures:
596  request_handler.send_response(200)
597  file_size = os.path.getsize(request_handler.file_path)
598  request_handler.send_header('Content-Length', file_size)
599  request_handler.end_headers()
600 
601  with open(request_handler.file_path, 'rb') as source:
602  data = source.read(self._bytes_to_send_before_interruption)
603  request_handler.wfile.write(data)
604 
605  self._failure_counter += 1
606  else:
607  request_handler.default_get()
608 
609  def map(self, url=''):
610  return {'GET': {url if url else self._url: DownloadInterruptionHandler(self._number_of_failures,
612 
613 
615  dummy_filez = (b'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
616  b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
617  b'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
618 
619  def __init__(self, number_of_failures=1, url='', fake_filez=False):
620  self._number_of_failures = number_of_failures
621  self._failure_counter = 0
622  self._url = url
623  self._fake_filez = fake_filez
624 
625  def __call__(self, request_handler):
626  if self._number_of_failures == -1 or self._failure_counter < self._number_of_failures:
627  request_handler.send_response(200)
628  request_handler.end_headers()
629  if self._fake_filez:
630  request_handler.wfile.write(self.dummy_filez)
631  else:
632  request_handler.wfile.write(b'malformed image')
633 
634  self._failure_counter += 1
635  else:
636  request_handler.default_get()
637 
638  def map(self, url):
639  return {'GET': {url if url else self._url: MalformedImageHandler(self._number_of_failures,
640  fake_filez=self._fake_filez)}}
641 
642 
644  def __init__(self, number_of_failures=1, url=''):
645  self._number_of_failures = number_of_failures
646  self._failure_counter = 0
647  self._url = url
648 
649  def __call__(self, request_handler):
650  if self._failure_counter < self._number_of_failures:
651  request_handler.send_response(200)
652  file_size = os.path.getsize(request_handler.file_path)
653  request_handler.end_headers()
654 
655  with open(request_handler.file_path, 'rb') as source:
656  while True:
657  data = source.read(1)
658  if not data:
659  break
660  request_handler.wfile.write(data)
661  request_handler.wfile.flush()
662  import time
663  time.sleep(100)
664 
665  self._failure_counter += 1
666  else:
667  request_handler.default_get()
668 
669  def map(self, url):
670  return {'GET': {url if url else self._url: SlowRetrievalHandler(self._number_of_failures)}}
671 
672 
674  def __init__(self, number_of_redirects=1, url=''):
675  self._number_of_redirects = number_of_redirects
676  self._redirect_counter = 0
677  self._url = url
678 
679  def __call__(self, request_handler):
681  request_handler.send_response(301)
682  request_handler.send_header('Location', request_handler.server.base_url + request_handler.path)
683  request_handler.end_headers()
684  self._redirect_counter += 1
685  else:
686  request_handler.default_get()
687 
688  def map(self, url):
689  return {'GET': {url if url else self._url: RedirectHandler(self._number_of_redirects)}}
690 
691 
693  def __init__(self, number_of_failures=1):
694  self._number_of_failures = number_of_failures
695  self._failure_counter = 0
696 
697  def __call__(self, request_handler):
698  if self._failure_counter < self._number_of_failures:
699  request_handler.send_response(200)
700  request_handler.end_headers()
701  request_handler.wfile.write(b'{"non-uptane-json": "some-value"}')
702 
703  self._failure_counter += 1
704  else:
705  request_handler.default_get()
706 
707  def map(self, url):
708  return {'GET': {url: MalformedJsonHandler(self._number_of_failures)}}
709 
710 
712  def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
713  self.image_rel_dir = 'repo/repo'
714  self.target_rel_dir = 'repo/repo/targets'
715 
716  self._repo_manager_exe = repo_manager_exe
717  self.root_dir = tempfile.mkdtemp()
718 
719  self.server_port = server_port
720  self.director_port = director_port
721  self.image_repo_port = image_repo_port
722  self.custom_repo_port = custom_repo_port
723 
724  def create_generic_server(self, **kwargs):
725  return FakeTestServerBackground(meta_path=self.root_dir, target_path=self.target_dir, port=self.server_port)
726 
727  def create_director_repo(self, handler_map={}):
728  return DirectorRepo(self.root_dir, 'localhost', self.director_port, client_handler_map=handler_map)
729 
730  def create_image_repo(self, handler_map={}):
731  return ImageRepo(self.root_dir, 'localhost', self.image_repo_port, client_handler_map=handler_map)
732 
733  def create_custom_repo(self, handler_map={}):
734  return CustomRepo(self.root_dir, 'localhost', self.custom_repo_port, client_handler_map=handler_map)
735 
736  @property
737  def image_dir(self):
738  return path.join(self.root_dir, self.image_rel_dir)
739 
740  @property
741  def target_dir(self):
742  return path.join(self.root_dir, self.target_rel_dir)
743 
744  @property
745  def target_file(self):
746  return path.join(self.image_dir, 'targets.json')
747 
748  def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
749 
750  targetname = target_name if target_name else image_filename
751 
752  with open(path.join(self.image_dir, image_filename), 'wb') as image_file:
753  image_file.write(urandom(image_size))
754 
755  image_creation_cmdline = [self._repo_manager_exe, '--path', self.root_dir,
756  '--command', 'image', '--filename', image_filename, '--targetname', targetname, '--hwid', id[0]]
757 
758  if custom_url:
759  image_creation_cmdline.append('--url')
760  image_creation_cmdline.append(custom_url)
761 
762  subprocess.run(image_creation_cmdline, cwd=self.image_dir, check=True)
763 
764  # update the director metadata
765  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
766  '--command', 'addtarget', '--targetname', targetname,
767  '--hwid', id[0], '--serial', id[1]], check=True)
768 
769  # sign so the image becomes available for an update for a client/device
770  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
771 
772  with open(self.target_file, "r") as target_file:
773  targets = json.load(target_file)
774  target_hash = targets["signed"]["targets"][targetname]["hashes"]["sha256"]
775 
776  return target_hash
777 
778  def add_ostree_target(self, id, rev_hash, target_name=None, expires_within_sec=(60 * 5)):
779  # emulate the backend behavior on defining a target name for OSTREE target format
780  target_name = rev_hash if target_name is None else "{}-{}".format(target_name, rev_hash)
781  image_creation_cmdline = [self._repo_manager_exe,
782  '--command', 'image',
783  '--path', self.root_dir,
784  '--targetname', target_name,
785  '--targetsha256', rev_hash,
786  '--targetlength', '0',
787  '--targetformat', 'OSTREE',
788  '--hwid', id[0]]
789  subprocess.run(image_creation_cmdline, check=True)
790 
791  expiration_time = time.time() + expires_within_sec
792  expiration_time_str = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(expiration_time))
793 
794  subprocess.run([self._repo_manager_exe,
795  '--command', 'addtarget',
796  '--path', self.root_dir,
797  '--targetname', target_name,
798  '--hwid', id[0],
799  '--serial', id[1],
800  '--expires', expiration_time_str],
801  check=True)
802 
803  subprocess.run([self._repo_manager_exe, '--path', self.root_dir, '--command', 'signtargets'], check=True)
804 
805  return target_name
806 
807  def __enter__(self):
808  self._generate_repo()
809  return self
810 
811  def __exit__(self, exc_type, exc_val, exc_tb):
812  shutil.rmtree(self.root_dir, ignore_errors=True)
813 
814  def _generate_repo(self):
815  subprocess.run([self._repo_manager_exe, '--path', self.root_dir,
816  '--command', 'generate', '--keytype', 'ED25519'], check=True)
817 
818 
819 def with_aktualizr(start=True, output_logs=False, id=('primary-hw-ID-001', str(uuid4())), wait_timeout=60,
820  secondary_wait_sec=600, log_level=1, aktualizr_primary_exe='src/aktualizr_primary/aktualizr',
821  aktualizr_info_exe='src/aktualizr_info/aktualizr-info',
822  run_mode='once'):
823  def decorator(test):
824  @wraps(test)
825  def wrapper(*args, ostree_mock_path=None, **kwargs):
826  aktualizr = Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
827  aktualizr_info_exe=aktualizr_info_exe, id=id,
828  wait_timeout=wait_timeout,
829  secondary_wait_sec=secondary_wait_sec,
830  log_level=log_level, output_logs=output_logs,
831  run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
832  if start:
833  with aktualizr:
834  result = test(*args, **kwargs, aktualizr=aktualizr)
835  else:
836  result = test(*args, **kwargs, aktualizr=aktualizr)
837  return result
838  return wrapper
839  return decorator
840 
841 
842 # The following decorators can be eliminated if pytest framework (or similar) is used
843 # by using fixtures instead
844 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
845  def decorator(test):
846  @wraps(test)
847  def wrapper(*args, **kwargs):
848  repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
849  with UptaneTestRepo(repo_manager_exe_abs_path) as repo:
850  if start_generic_server:
851  with repo.create_generic_server() as uptane_server:
852  result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
853  else:
854  result = test(*args, **kwargs, uptane_repo=repo)
855  return result
856  return wrapper
857  return decorator
858 
859 
860 def with_director(start=True, handlers=[]):
861  def decorator(test):
862  @wraps(test)
863  def wrapper(*args, uptane_repo, **kwargs):
864  def func(handler_map={}):
865  director = uptane_repo.create_director_repo(handler_map=handler_map)
866  if start:
867  with director:
868  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
869  else:
870  result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
871  return result
872 
873  if handlers and len(handlers) > 0:
874  for handler in handlers:
875  result = func(handler.map(kwargs.get('test_path', '')))
876  if not result:
877  break
878  else:
879  result = func()
880  return result
881  return wrapper
882  return decorator
883 
884 
885 def with_imagerepo(start=True, handlers=[]):
886  def decorator(test):
887  @wraps(test)
888  def wrapper(*args, uptane_repo, **kwargs):
889  def func(handler_map={}):
890  image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
891  if start:
892  with image_repo:
893  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
894  else:
895  result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
896  return result
897 
898  if handlers and len(handlers) > 0:
899  for handler in handlers:
900  result = func(handler.map(kwargs.get('test_path', '')))
901  if not result:
902  break
903  else:
904  result = func()
905  return result
906  return wrapper
907  return decorator
908 
909 
910 def with_secondary(start=True, output_logs=False, id=('secondary-hw-ID-001', None),
911  force_reboot=False, arg_name='secondary',
912  aktualizr_secondary_exe='src/aktualizr_secondary/aktualizr-secondary'):
913  def decorator(test):
914  @wraps(test)
915  def wrapper(*args, **kwargs):
916  id1 = id
917  if id1[1] is None:
918  id1 = (id1[0], str(uuid4()))
919  secondary = IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, output_logs=output_logs, id=id1, force_reboot=force_reboot, **kwargs)
920  sl = kwargs.get("secondaries", []) + [secondary]
921  kwargs.update({arg_name: secondary, "secondaries": sl})
922  if "primary_port" not in kwargs:
923  kwargs["primary_port"] = secondary.primary_port
924  if start:
925  with secondary:
926  result = test(*args, **kwargs)
927  else:
928  result = test(*args, **kwargs)
929  return result
930  return wrapper
931  return decorator
932 
933 
934 def with_path(paths):
935  def decorator(test):
936  @wraps(test)
937  def wrapper(*args, **kwargs):
938  for test_path in paths:
939  result = test(*args, **kwargs, test_path=test_path)
940  if not result:
941  break
942  return result
943  return wrapper
944  return decorator
945 
946 
948  def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
949  self.aktualizr = aktualizr
950  self.images_to_install = []
951  for image in images_to_install:
952  self.images_to_install.append({
953  'ecu_id': image[0],
954  'filename': image[1],
955  'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2] if len(image) > 2 else '')
956  })
957 
958  def are_images_installed(self):
959  result = True
960  for image in self.images_to_install:
961  if not (image['hash'] == self.aktualizr.get_current_image_info(image['ecu_id'])):
962  result = False
963  break
964 
965  return result
966 
967 
968 def with_install_manager(default_images=True):
969  def decorator(test):
970  @wraps(test)
971  def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
972  if default_images and (not images_to_install or len(images_to_install) == 0):
973  images_to_install = [(aktualizr.id, 'primary-image.img')]
974  if secondary:
975  images_to_install.append((secondary.id, 'secondary-image.img'))
976  install_mngr = InstallManager(aktualizr, uptane_repo, images_to_install)
977  result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
978  uptane_repo=uptane_repo, install_mngr=install_mngr)
979  return result
980  return wrapper
981  return decorator
982 
983 
984 def with_images(images_to_install):
985  def decorator(test):
986  @wraps(test)
987  def wrapper(*args, **kwargs):
988  return test(*args, **kwargs, images_to_install=images_to_install)
989  return wrapper
990  return decorator
991 
992 
993 def with_customrepo(start=True, handlers=[]):
994  def decorator(test):
995  @wraps(test)
996  def wrapper(*args, uptane_repo, **kwargs):
997  def func(handler_map={}):
998  custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
999  if start:
1000  with custom_repo:
1001  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
1002  else:
1003  result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
1004  return result
1005 
1006  if handlers and len(handlers) > 0:
1007  for handler in handlers:
1008  result = func(handler.map(kwargs.get('test_path', '')))
1009  if not result:
1010  break
1011  else:
1012  result = func()
1013  return result
1014  return wrapper
1015  return decorator
1016 
1017 
1018 class Sysroot:
1019  repo_path = 'ostree_repo'
1020 
1021  def __enter__(self):
1022  self._root = tempfile.mkdtemp()
1023  self.path = os.path.join(self._root, self.repo_path)
1024  self.version_file = os.path.join(self.path, 'version')
1025 
1026  subprocess.run(['cp', '-r', self.repo_path, self._root], check=True)
1027 
1028  initial_revision = self.get_revision()
1029  with open(self.version_file, 'wt') as version_file:
1030  version_file.writelines(['{}\n'.format(initial_revision),
1031  '{}\n'.format('0'),
1032  '{}\n'.format(initial_revision),
1033  '{}\n'.format('0')])
1034 
1035  return self
1036 
1037  def __exit__(self, exc_type, exc_val, exc_tb):
1038  shutil.rmtree(self._root, ignore_errors=True)
1039 
1040  def get_revision(self):
1041  rev_cmd_res = subprocess.run(['ostree', 'rev-parse', '--repo', self.path + '/ostree/repo', 'generate-remote:generated'],
1042  timeout=60, check=True, stdout=subprocess.PIPE)
1043 
1044  return rev_cmd_res.stdout.decode('ascii').rstrip('\n')
1045 
1046  def update_revision(self, rev):
1047  with open(self.version_file, 'wt') as version_file:
1048  version_file.writelines(['{}\n'.format(rev),
1049  '{}\n'.format('1'),
1050  '{}\n'.format(rev),
1051  '{}\n'.format('1')])
1052 
1053 
1054 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
1055  def decorator(test):
1056  @wraps(test)
1057  def wrapper(*args, **kwargs):
1058  with Sysroot() as sysroot:
1059  return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1060  return wrapper
1061  return decorator
1062 
1063 
1064 def with_treehub(handlers=[], port=0):
1065  def decorator(test):
1066  @wraps(test)
1067  def wrapper(*args, **kwargs):
1068  def func(handler_map={}):
1069  with Treehub('localhost', port=port, client_handler_map=handler_map) as treehub:
1070  return test(*args, **kwargs, treehub=treehub)
1071 
1072  if handlers and len(handlers) > 0:
1073  for handler in handlers:
1074  result = func(handler.map(kwargs.get('test_path', '')))
1075  if not result:
1076  break
1077  else:
1078  result = func()
1079  return result
1080  return wrapper
1081  return decorator
1082 
1083 
1084 class NonDaemonPool(pool.Pool):
1085  def Process(self, *args, **kwds):
1086  proc = super(NonDaemonPool, self).Process(*args, **kwds)
1087 
1088  class NonDaemonProcess(proc.__class__):
1089  """Monkey-patch process to ensure it is never daemonized"""
1090 
1091  @property
1092  def daemon(self):
1093  return False
1094 
1095  @daemon.setter
1096  def daemon(self, val):
1097  pass
1098 
1099  proc.__class__ = NonDaemonProcess
1100 
1101  return proc
1102 
1103 
1105  def __init__(self, tests):
1106  self._tests = tests
1107  self._test_runner_pool = NonDaemonPool(min(len(self._tests), cpu_count()))
1108 
1109  def __enter__(self):
1110  return self
1111 
1112  def __exit__(self, exc_type, exc_value, traceback):
1113  # This must be called, see https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
1114  self._test_runner_pool.__exit__(exc_type, exc_value, traceback)
1115 
1116  @staticmethod
1117  def test_runner(test):
1118  logger.info('>>> Running {}...'.format(test.__name__))
1119  test_run_result = test()
1120  logger.info('>>> {}: {}\n'.format('OK' if test_run_result else 'FAILED', test.__name__))
1121  return test_run_result
1122 
1123  def run(self):
1124  results = self._test_runner_pool.map(TestRunner.test_runner, self._tests)
1125  total_result = True
1126  for result in results:
1127  total_result = total_result and result
1128  return total_result
test_fixtures.UptaneTestRepo.target_file
def target_file(self)
Definition: test_fixtures.py:745
test_fixtures.IPSecondary._process
_process
Definition: test_fixtures.py:383
test_fixtures.SlowRetrievalHandler._url
_url
Definition: test_fixtures.py:647
test_fixtures.CustomRepo
Definition: test_fixtures.py:550
test_fixtures.MalformedImageHandler.dummy_filez
tuple dummy_filez
Definition: test_fixtures.py:615
test_fixtures.DirectorRepo._last_install_res_lock
_last_install_res_lock
Definition: test_fixtures.py:489
test_fixtures.CustomRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:554
test_fixtures.IPSecondary.id
id
Definition: test_fixtures.py:305
test_fixtures.UptaneRepo.base_url
base_url
Definition: test_fixtures.py:407
test_fixtures.MalformedImageHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:620
test_fixtures.Aktualizr._storage_dir
_storage_dir
Definition: test_fixtures.py:32
test_fixtures.MalformedJsonHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:694
test_fixtures.Treehub.root
root
Definition: test_fixtures.py:566
test_fixtures.UptaneRepo.doc_root
doc_root
Definition: test_fixtures.py:408
test_fixtures.Sysroot.path
path
Definition: test_fixtures.py:1023
test_fixtures.Aktualizr._secondary_wait_sec
_secondary_wait_sec
Definition: test_fixtures.py:45
test_fixtures.Aktualizr._output_logs
_output_logs
Definition: test_fixtures.py:68
test_fixtures.MalformedImageHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:621
test_fixtures.DirectorRepo
Definition: test_fixtures.py:474
test_fixtures.TestRunner._tests
_tests
Definition: test_fixtures.py:1106
test_fixtures.IPSecondary.get_free_port
def get_free_port()
Definition: test_fixtures.py:375
test_fixtures.UptaneRepo._server_thread
_server_thread
Definition: test_fixtures.py:409
test_fixtures.Sysroot._root
_root
Definition: test_fixtures.py:1022
test_fixtures.InstallManager.images_to_install
images_to_install
Definition: test_fixtures.py:950
test_fixtures.UptaneTestRepo.root_dir
root_dir
Definition: test_fixtures.py:717
test_fixtures.UptaneRepo.Handler.disable_nagle_algorithm
disable_nagle_algorithm
Definition: test_fixtures.py:432
test_fixtures.Aktualizr.get_current_primary_image_info
def get_current_primary_image_info(self)
Definition: test_fixtures.py:223
test_fixtures.Aktualizr._log_level
_log_level
Definition: test_fixtures.py:33
test_fixtures.Sysroot.repo_path
string repo_path
Definition: test_fixtures.py:1019
test_fixtures.UptaneRepo.Handler
Definition: test_fixtures.py:429
test_fixtures.IPSecondary._config_file
_config_file
Definition: test_fixtures.py:333
test_fixtures.UptaneRepo.Handler.file_path
def file_path(self)
Definition: test_fixtures.py:452
test_fixtures.RedirectHandler._url
_url
Definition: test_fixtures.py:677
test_fixtures.UptaneTestRepo
Definition: test_fixtures.py:711
test_fixtures.UptaneTestRepo.image_repo_port
image_repo_port
Definition: test_fixtures.py:721
test_fixtures.RedirectHandler._number_of_redirects
_number_of_redirects
Definition: test_fixtures.py:675
test_fixtures.UptaneTestRepo._repo_manager_exe
_repo_manager_exe
Definition: test_fixtures.py:716
test_fixtures.ImageRepo
Definition: test_fixtures.py:537
test_fixtures.IPSecondary.port
port
Definition: test_fixtures.py:309
test_fixtures.Aktualizr._sentinel_file
_sentinel_file
Definition: test_fixtures.py:34
test_fixtures.ImageRepo.image_subdir
string image_subdir
Definition: test_fixtures.py:543
test_fixtures.IPSecondary.primary_port
primary_port
Definition: test_fixtures.py:310
test_fixtures.InstallManager
Definition: test_fixtures.py:947
test_fixtures.Aktualizr._run_env
_run_env
Definition: test_fixtures.py:70
test_fixtures.IPSecondary
Definition: test_fixtures.py:302
test_fixtures.Treehub
Definition: test_fixtures.py:561
test_fixtures.DirectorRepo._manifest
_manifest
Definition: test_fixtures.py:487
Process
Definition: test_utils.h:19
test_fixtures.UptaneTestRepo.target_dir
def target_dir(self)
Definition: test_fixtures.py:741
test_fixtures.Aktualizr.id
id
Definition: test_fixtures.py:29
test_fixtures.DownloadInterruptionHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:591
test_fixtures.TestRunner._test_runner_pool
_test_runner_pool
Definition: test_fixtures.py:1107
test_fixtures.Aktualizr._process
_process
Definition: test_fixtures.py:244
test_fixtures.Aktualizr.get_info
def get_info(self, retry=30)
Definition: test_fixtures.py:163
test_fixtures.UptaneTestRepo.server_port
server_port
Definition: test_fixtures.py:719
test_fixtures.DirectorRepo._last_install_res
_last_install_res
Definition: test_fixtures.py:488
test_fixtures.UptaneTestRepo.target_rel_dir
target_rel_dir
Definition: test_fixtures.py:714
test_fixtures.IPSecondary._sentinel_file
_sentinel_file
Definition: test_fixtures.py:311
test_fixtures.KeyStore
Definition: test_fixtures.py:279
test_fixtures.NonDaemonPool
Definition: test_fixtures.py:1084
test_fixtures.UptaneTestRepo.image_rel_dir
image_rel_dir
Definition: test_fixtures.py:713
test_fixtures.Aktualizr._import_dir
_import_dir
Definition: test_fixtures.py:36
test_fixtures.UptaneRepo.Handler.doc_root
doc_root
Definition: test_fixtures.py:431
test_fixtures.DownloadInterruptionHandler
Definition: test_fixtures.py:587
test_fixtures.RedirectHandler
Definition: test_fixtures.py:673
test_fixtures.UptaneRepo
Definition: test_fixtures.py:403
test_fixtures.UptaneTestRepo._generate_repo
def _generate_repo(self)
Definition: test_fixtures.py:814
test_fixtures.UptaneTestRepo.custom_repo_port
custom_repo_port
Definition: test_fixtures.py:722
test_fixtures.Aktualizr._aktualizr_primary_exe
_aktualizr_primary_exe
Definition: test_fixtures.py:30
test_fixtures.SlowRetrievalHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:645
test_fixtures.SlowRetrievalHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:646
test_fixtures.IPSecondary.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:313
test_fixtures.DirectorRepo._installed_condition
_installed_condition
Definition: test_fixtures.py:490
test_fixtures.Treehub.Handler
Definition: test_fixtures.py:577
test_fixtures.UptaneTestRepo.director_port
director_port
Definition: test_fixtures.py:720
test_fixtures.IPSecondary._output_logs
_output_logs
Definition: test_fixtures.py:312
test_fixtures.MalformedJsonHandler._failure_counter
_failure_counter
Definition: test_fixtures.py:695
test_fixtures.InstallManager.aktualizr
aktualizr
Definition: test_fixtures.py:949
test_fixtures.MalformedImageHandler._fake_filez
_fake_filez
Definition: test_fixtures.py:623
test_fixtures.DownloadInterruptionHandler._bytes_to_send_before_interruption
_bytes_to_send_before_interruption
Definition: test_fixtures.py:589
test_fixtures.MalformedJsonHandler
Definition: test_fixtures.py:692
test_fixtures.MalformedImageHandler
Definition: test_fixtures.py:614
test_fixtures.DirectorRepo.Handler
Definition: test_fixtures.py:492
test_fixtures.Aktualizr
Definition: test_fixtures.py:26
test_fixtures.IPSecondary._aktualizr_secondary_exe
_aktualizr_secondary_exe
Definition: test_fixtures.py:307
test_fixtures.SlowRetrievalHandler
Definition: test_fixtures.py:643
test_fixtures.MalformedImageHandler._url
_url
Definition: test_fixtures.py:622
test_fixtures.Aktualizr._run_mode
_run_mode
Definition: test_fixtures.py:69
test_fixtures.DirectorRepo.director_subdir
string director_subdir
Definition: test_fixtures.py:481
test_fixtures.DownloadInterruptionHandler._url
_url
Definition: test_fixtures.py:592
test_fixtures.DownloadInterruptionHandler._number_of_failures
_number_of_failures
Definition: test_fixtures.py:590
test_fixtures.IPSecondary.storage_dir
storage_dir
Definition: test_fixtures.py:308
test_fixtures.Aktualizr._secondary_config_file
_secondary_config_file
Definition: test_fixtures.py:44
test_fixtures.Aktualizr._aktualizr_info_exe
_aktualizr_info_exe
Definition: test_fixtures.py:31
test_fixtures.Aktualizr.add_secondary
def add_secondary(self, secondary)
Definition: test_fixtures.py:135
test_fixtures.Treehub.revision
revision
Definition: test_fixtures.py:570
test_fixtures.Aktualizr._get_current_image_info
def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:')
Definition: test_fixtures.py:203
test_fixtures.RedirectHandler._redirect_counter
_redirect_counter
Definition: test_fixtures.py:676
test_fixtures.Aktualizr._config_file
_config_file
Definition: test_fixtures.py:63
test_fixtures.Sysroot.get_revision
def get_revision(self)
Definition: test_fixtures.py:1040
test_fixtures.IPSecondary._run_env
_run_env
Definition: test_fixtures.py:335
test_fixtures.Aktualizr.reboot_sentinel_file
reboot_sentinel_file
Definition: test_fixtures.py:35
test_fixtures.Sysroot.version_file
version_file
Definition: test_fixtures.py:1024
test_fixtures.UptaneTestRepo.image_dir
def image_dir(self)
Definition: test_fixtures.py:737
test_fixtures.TestRunner
Definition: test_fixtures.py:1104
test_fixtures.Sysroot
Definition: test_fixtures.py:1018