12 from os
import devnull
14 from uuid
import uuid4
15 from os
import urandom
16 from functools
import wraps
17 from http.server
import SimpleHTTPRequestHandler, HTTPServer
19 from fake_http_server.fake_test_server
import FakeTestServerBackground
20 from sota_tools.treehub_server
import create_repo
23 logger = logging.getLogger(__name__)
28 def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29 uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30 secondary=None, output_logs=True,
31 run_mode='once', director=None, image_repo=None,
32 sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
43 with open(path.join(self.
_storage_dir.name,
'secondary_config.json'),
'w+')
as secondary_config_file:
44 secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45 format(port=secondary.primary_port
if secondary
else wait_port,
46 timeout=wait_timeout))
47 json.dump(secondary_cfg, secondary_config_file)
50 with open(path.join(self.
_storage_dir.name,
'config.toml'),
'w+')
as config_file:
51 config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
53 serial=id[1], hw_ID=id[0],
58 director=director.base_url
if director
else '',
59 image_repo=image_repo.base_url
if image_repo
else '',
60 pacman_type=
'ostree' if treehub
and sysroot
else 'fake',
61 ostree_sysroot=sysroot.path
if sysroot
else '',
62 treehub_server=treehub.base_url
if treehub
else '',
71 if sysroot
and ostree_mock_path:
72 self.
_run_env[
'LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
73 self.
_run_env[
'OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
77 server = "{server_url}"
80 base_path = "{import_path}"
81 tls_cacert_path = "ca.pem"
82 tls_pkey_path = "pkey.pem"
83 tls_clientcert_path = "client.pem"
86 primary_ecu_serial = "{serial}"
87 primary_ecu_hardware_id = "{hw_ID}"
90 path = "{storage_dir}"
92 sqldb_path = "{db_path}"
95 type = "{pacman_type}"
96 sysroot = "{ostree_sysroot}"
97 ostree_server = "{treehub_server}"
102 secondary_config_file = "{secondary_cfg_file}"
103 director_server = "{director}"
104 repo_server = "{image_repo}"
107 reboot_sentinel_dir = "{sentinel_dir}"
108 reboot_sentinel_name = "{sentinel_name}"
112 loglevel = {log_level}
116 SECONDARY_CONFIG_TEMPLATE =
'''
119 "secondaries_wait_port": {port},
120 "secondaries_wait_timeout": {timeout},
126 def add_secondary(self, secondary):
128 sec_cfg = json.load(config_file)
129 sec_cfg[
"IP"][
"secondaries"].append({
"addr":
"127.0.0.1:{}".format(secondary.port)})
131 json.dump(sec_cfg, config_file)
133 def update_wait_timeout(self, timeout):
135 sec_cfg = json.load(config_file)
136 sec_cfg[
"IP"][
"secondaries_wait_timeout"] = timeout
138 json.dump(sec_cfg, config_file)
140 def run(self, run_mode):
144 def get_info(self, retry=15):
146 for ii
in range(0, retry):
148 timeout=60, stdout=subprocess.PIPE, env=self.
_run_env)
149 if info_exe_res.returncode == 0
and \
150 str(info_exe_res.stdout).find(
'no details about installed nor pending images') != -1:
153 if info_exe_res
and info_exe_res.returncode == 0:
154 return str(info_exe_res.stdout)
156 logger.error(str(info_exe_res.stderr))
161 def is_ecu_registered(self, ecu_id):
163 if not ((device_status.find(ecu_id[0]) != -1)
and (device_status.find(ecu_id[1]) != -1)):
165 not_registered_field =
"Removed or not registered ecus:"
166 not_reg_start = device_status.find(not_registered_field)
167 return not_reg_start == -1
or (device_status.find(ecu_id[1], not_reg_start) == -1)
169 def get_current_image_info(self, ecu_id):
170 if self.
id == ecu_id:
175 def get_current_pending_image_info(self, ecu_id):
182 def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:
'):
185 ecu_serial = ecu_id[1]
186 ecu_info_position = aktualizr_status.find(ecu_serial)
187 if ecu_info_position == -1:
190 start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
191 end = aktualizr_status.find(
'\\n', start)
192 hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
202 def get_current_primary_image_info(self):
203 primary_hash_field =
'Current primary ecu running version: '
206 start = aktualizr_status.find(primary_hash_field)
207 end = aktualizr_status.find(
'\\n', start)
208 return aktualizr_status[start + len(primary_hash_field):end]
210 logger.error(
"Failed to get aktualizr info/status")
215 def get_primary_pending_version(self):
216 primary_hash_field =
'Pending primary ecu version: '
218 start = aktualizr_status.find(primary_hash_field)
219 end = aktualizr_status.find(
'\\n', start)
220 return aktualizr_status[start + len(primary_hash_field):end]
225 stderr=
None if self.
_output_logs else subprocess.STDOUT,
228 logger.debug(
"Aktualizr has been started")
231 def __exit__(self, exc_type, exc_val, exc_tb):
234 logger.debug(
"Aktualizr has been stopped")
236 def terminate(self, sig=signal.SIGTERM):
240 return self.
_process.stdout.read().decode(errors=
'replace')
242 def wait_for_completion(self, timeout=120):
245 def wait_for_provision(self, timeout=60):
246 deadline = time.time() + timeout
247 while timeout == 0
or time.time() < deadline:
249 if info
is not None and 'Provisioned on server: yes' in info:
254 def emulate_reboot(self):
262 def copy_keys(dest_path):
264 shutil.copy(KeyStore.ca(), dest_path)
265 shutil.copy(KeyStore.pkey(), dest_path)
266 shutil.copy(KeyStore.cert(), dest_path)
270 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/ca.pem')
274 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/pkey.pem')
278 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/client.pem')
283 def __init__(self, aktualizr_secondary_exe, id, port=9050, primary_port=9040,
284 sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
295 with open(path.join(self.
storage_dir.name,
'config.toml'),
'w+')
as config_file:
296 config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
299 db_path=path.join(self.
storage_dir.name,
'db.sql'),
300 pacman_type=
'ostree' if treehub
and sysroot
else 'fake',
301 ostree_sysroot=sysroot.path
if sysroot
else '',
302 treehub_server=treehub.base_url
if treehub
else '',
309 if sysroot
and ostree_mock_path:
310 self.
_run_env[
'LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
311 self.
_run_env[
'OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
314 CONFIG_TEMPLATE =
'''
316 ecu_serial = "{serial}"
317 ecu_hardware_id = "{hw_ID}"
321 primary_ip = "127.0.0.1"
322 primary_port = {primary_port}
326 path = "{storage_dir}"
327 sqldb_path = "{db_path}"
330 type = "{pacman_type}"
331 sysroot = "{ostree_sysroot}"
332 ostree_server = "{treehub_server}"
336 reboot_sentinel_dir = "{sentinel_dir}"
337 reboot_sentinel_name = "{sentinel_name}"
341 def is_running(self):
342 return True if self.
_process.poll()
is None else False
346 tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
348 port = tcp.getsockname()[1]
358 logger.debug(
"IP Secondary {} has been started: {}".format(self.
id, self.
port))
361 def __exit__(self, exc_type, exc_val, exc_tb):
364 logger.debug(
"IP Secondary {} has been stopped".format(self.
id))
366 def emulate_reboot(self):
371 def __init__(self, doc_root, ifc, port, client_handler_map={}):
372 super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.
Handler)
374 self.
base_url =
'http://{}:{}'.format(self.server_address[0], self.server_address[1])
379 lambda request: (self.
Handler.handler_map.get(
'POST', {})).get(request.path,
380 self.
Handler.default_handler)(request)
383 lambda request: (self.
Handler.handler_map.get(
'PUT', {})).get(request.path,
384 self.
Handler.default_handler)(request)
387 lambda request: (self.
Handler.handler_map.get(
'GET', {})).get(request.path,
388 self.
Handler.default_get)(request)
390 for method, method_handlers
in client_handler_map.items():
391 for url, handler
in method_handlers.items():
392 if self.
Handler.handler_map.get(method,
None)
is None:
393 self.
Handler.handler_map[method] = {}
394 self.
Handler.handler_map[method][url] = handler
397 def __init__(self, request, client_address, server):
402 def default_handler(self):
403 self.send_response(200)
406 def default_get(self):
408 self.send_response(404)
411 self.send_response(200)
413 with open(self.
file_path,
'rb')
as source:
414 self.copyfile(source, self.wfile)
420 return os.path.join(self.
doc_root, self.path[1:])
423 self._server_thread = threading.Thread(target=self.serve_forever)
424 self._server_thread.start()
430 if self._server_thread:
431 self._server_thread.join(timeout=60)
432 self._server_thread =
None
437 def __exit__(self, exc_type, exc_val, exc_tb):
444 - serves signed metadata about images
445 - receives device manifest which includes installation report if any installation has happened
448 director_subdir =
"repo/director"
450 def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
451 super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.
director_subdir), ifc=ifc, port=port,
452 client_handler_map=client_handler_map)
460 def handle_manifest(self):
461 self.send_response(200)
465 data_size = int(self.headers[
'Content-Length'])
466 data_string = self.rfile.read(data_size)
467 json_data = json.loads(data_string)
468 except Exception
as exc:
472 install_report = json_data[
'signed'].get(
'installation_report',
"")
474 self.server.set_install_event(json_data)
476 handler_map = {
'PUT': {
'/manifest': handle_manifest}}
478 def set_install_event(self, manifest):
479 with self._installed_condition:
480 self._manifest = manifest
481 self._last_install_res = manifest[
'signed'][
'installation_report'][
'report'][
'result'][
'success']
482 self._installed_condition.notifyAll()
484 def wait_for_install(self, timeout=180):
485 with self._installed_condition:
486 self._installed_condition.wait(timeout=timeout)
487 return self._last_install_res
489 def get_install_result(self):
490 with self._installed_condition:
491 return self._last_install_res
493 def get_manifest(self):
494 with self._installed_condition:
495 return self._manifest
497 def get_ecu_manifest(self, ecu_serial):
498 return self.get_manifest()[
'signed'][
'ecu_version_manifests'][ecu_serial]
500 def get_ecu_manifest_filepath(self, ecu_serial):
501 return self.get_ecu_manifest(ecu_serial)[
'signed'][
'installed_image'][
'filepath']
505 This server serves signed metadata about images
506 as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
509 image_subdir =
"repo/repo"
511 def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
512 super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.
image_subdir), ifc=ifc, port=port,
513 client_handler_map=client_handler_map)
518 This server serves images
520 image_subdir =
"repo/repo"
522 def __init__(self, root, ifc, port, client_handler_map={}):
523 super(CustomRepo, self).__init__(os.path.join(root, self.
image_subdir),
524 ifc=ifc, port=port, client_handler_map=client_handler_map)
529 This server serves requests from an ostree client, i.e. emulates/mocks the treehub server
531 def __init__(self, ifc, port, client_handler_map={}):
532 self.
root = tempfile.mkdtemp()
533 super(Treehub, self).__init__(self.
root, ifc=ifc, port=port, client_handler_map=client_handler_map)
537 return super(Treehub, self).__enter__()
539 def __exit__(self, exc_type, exc_val, exc_tb):
540 super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
541 shutil.rmtree(self.
root, ignore_errors=
True)
544 def default_get(self):
546 self.send_response(404)
554 def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
560 def __call__(self, request_handler):
562 request_handler.send_response(200)
563 file_size = os.path.getsize(request_handler.file_path)
564 request_handler.send_header(
'Content-Length', file_size)
565 request_handler.end_headers()
567 with open(request_handler.file_path,
'rb')
as source:
569 request_handler.wfile.write(data)
573 request_handler.default_get()
575 def map(self, url=''):
581 dummy_filez = (b
'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
582 b
'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
583 b
'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
585 def __init__(self, number_of_failures=1, url='', fake_filez=False):
591 def __call__(self, request_handler):
593 request_handler.send_response(200)
594 request_handler.end_headers()
598 request_handler.wfile.write(b
'malformed image')
602 request_handler.default_get()
610 def __init__(self, number_of_failures=1, url=''):
615 def __call__(self, request_handler):
617 request_handler.send_response(200)
618 file_size = os.path.getsize(request_handler.file_path)
619 request_handler.end_headers()
621 with open(request_handler.file_path,
'rb')
as source:
623 data = source.read(1)
626 request_handler.wfile.write(data)
627 request_handler.wfile.flush()
633 request_handler.default_get()
640 def __init__(self, number_of_redirects=1, url=''):
645 def __call__(self, request_handler):
647 request_handler.send_response(301)
648 request_handler.send_header(
'Location', request_handler.server.base_url + request_handler.path)
649 request_handler.end_headers()
652 request_handler.default_get()
659 def __init__(self, number_of_failures=1):
663 def __call__(self, request_handler):
665 request_handler.send_response(200)
666 request_handler.end_headers()
667 request_handler.wfile.write(b
'{"non-uptane-json": "some-value"}')
671 request_handler.default_get()
678 def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
690 def create_generic_server(self, **kwargs):
693 def create_director_repo(self, handler_map={}):
696 def create_image_repo(self, handler_map={}):
699 def create_custom_repo(self, handler_map={}):
707 def target_dir(self):
711 def target_file(self):
712 return path.join(self.
image_dir,
'targets.json')
714 def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
716 targetname = target_name
if target_name
else image_filename
718 with open(path.join(self.
image_dir, image_filename),
'wb')
as image_file:
719 image_file.write(urandom(image_size))
722 '--command',
'image',
'--filename', image_filename,
'--targetname', targetname,
'--hwid', id[0]]
725 image_creation_cmdline.append(
'--url')
726 image_creation_cmdline.append(custom_url)
728 subprocess.run(image_creation_cmdline, cwd=self.
image_dir, check=
True)
732 '--command',
'addtarget',
'--targetname', targetname,
733 '--hwid', id[0],
'--serial', id[1]], check=
True)
739 targets = json.load(target_file)
740 target_hash = targets[
"signed"][
"targets"][targetname][
"hashes"][
"sha256"]
744 def add_ostree_target(self, id, rev_hash, target_name=None):
746 target_name = rev_hash
if target_name
is None else "{}-{}".format(target_name, rev_hash)
748 '--command',
'image',
750 '--targetname', target_name,
751 '--targetsha256', rev_hash,
752 '--targetlength',
'0',
753 '--targetformat',
'OSTREE',
755 subprocess.run(image_creation_cmdline, check=
True)
758 '--command',
'addtarget',
760 '--targetname', target_name,
773 def __exit__(self, exc_type, exc_val, exc_tb):
774 shutil.rmtree(self.
root_dir, ignore_errors=
True)
776 def _generate_repo(self):
778 '--command',
'generate',
'--keytype',
'ED25519'], check=
True)
781 def with_aktualizr(start=True, output_logs=False, id=(
'primary-hw-ID-001', str(uuid4())), wait_timeout=60,
782 log_level=1, aktualizr_primary_exe=
'src/aktualizr_primary/aktualizr',
783 aktualizr_info_exe=
'src/aktualizr_info/aktualizr-info',
787 def wrapper(*args, ostree_mock_path=None, **kwargs):
788 aktualizr =
Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
789 aktualizr_info_exe=aktualizr_info_exe, id=id,
790 wait_timeout=wait_timeout, log_level=log_level, output_logs=output_logs,
791 run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
794 result = test(*args, **kwargs, aktualizr=aktualizr)
796 result = test(*args, **kwargs, aktualizr=aktualizr)
804 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
807 def wrapper(*args, **kwargs):
808 repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
810 if start_generic_server:
811 with repo.create_generic_server()
as uptane_server:
812 result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
814 result = test(*args, **kwargs, uptane_repo=repo)
820 def with_director(start=True, handlers=[]):
823 def wrapper(*args, uptane_repo, **kwargs):
824 def func(handler_map={}):
825 director = uptane_repo.create_director_repo(handler_map=handler_map)
828 result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
830 result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
833 if handlers
and len(handlers) > 0:
834 for handler
in handlers:
835 result = func(handler.map(kwargs.get(
'test_path',
'')))
845 def with_imagerepo(start=True, handlers=[]):
848 def wrapper(*args, uptane_repo, **kwargs):
849 def func(handler_map={}):
850 image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
853 result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
855 result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
858 if handlers
and len(handlers) > 0:
859 for handler
in handlers:
860 result = func(handler.map(kwargs.get(
'test_path',
'')))
870 def with_secondary(start=True, id=(
'secondary-hw-ID-001', str(uuid4())),
871 aktualizr_secondary_exe=
'src/aktualizr_secondary/aktualizr-secondary'):
874 def wrapper(*args, **kwargs):
875 secondary =
IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, id=id, **kwargs)
878 result = test(*args, **kwargs, secondary=secondary)
880 result = test(*args, **kwargs, secondary=secondary)
886 def with_path(paths):
889 def wrapper(*args, **kwargs):
890 for test_path
in paths:
891 result = test(*args, **kwargs, test_path=test_path)
900 def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
903 for image
in images_to_install:
906 'filename': image[1],
907 'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2]
if len(image) > 2
else '')
910 def are_images_installed(self):
913 if not (image[
'hash'] == self.
aktualizr.get_current_image_info(image[
'ecu_id'])):
920 def with_install_manager(default_images=True):
923 def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
924 if default_images
and (
not images_to_install
or len(images_to_install) == 0):
925 images_to_install = [(aktualizr.id,
'primary-image.img')]
927 images_to_install.append((secondary.id,
'secondary-image.img'))
928 install_mngr =
InstallManager(aktualizr, uptane_repo, images_to_install)
929 result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
930 uptane_repo=uptane_repo, install_mngr=install_mngr)
936 def with_images(images_to_install):
939 def wrapper(*args, **kwargs):
940 return test(*args, **kwargs, images_to_install=images_to_install)
945 def with_customrepo(start=True, handlers=[]):
948 def wrapper(*args, uptane_repo, **kwargs):
949 def func(handler_map={}):
950 custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
953 result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
955 result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
958 if handlers
and len(handlers) > 0:
959 for handler
in handlers:
960 result = func(handler.map(kwargs.get(
'test_path',
'')))
971 repo_path =
'ostree_repo'
974 self.
_root = tempfile.mkdtemp()
978 subprocess.run([
'cp',
'-r', self.
repo_path, self.
_root], check=
True)
982 version_file.writelines([
'{}\n'.format(initial_revision),
984 '{}\n'.format(initial_revision),
989 def __exit__(self, exc_type, exc_val, exc_tb):
990 shutil.rmtree(self.
_root, ignore_errors=
True)
992 def get_revision(self):
993 rev_cmd_res = subprocess.run([
'ostree',
'rev-parse',
'--repo', self.
path +
'/ostree/repo',
'generate-remote:generated'],
994 timeout=60, check=
True, stdout=subprocess.PIPE)
996 return rev_cmd_res.stdout.decode(
'ascii').rstrip(
'\n')
998 def update_revision(self, rev):
1000 version_file.writelines([
'{}\n'.format(rev),
1003 '{}\n'.format(
'1')])
1006 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
1007 def decorator(test):
1009 def wrapper(*args, **kwargs):
1011 return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1016 def with_treehub(handlers=[], port=0):
1017 def decorator(test):
1019 def wrapper(*args, **kwargs):
1020 def func(handler_map={}):
1021 with Treehub(
'localhost', port=port, client_handler_map=handler_map)
as treehub:
1022 return test(*args, **kwargs, treehub=treehub)
1024 if handlers
and len(handlers) > 0:
1025 for handler
in handlers:
1026 result = func(handler.map(kwargs.get(
'test_path',
'')))