12 from os
import devnull
14 from uuid
import uuid4
15 from os
import urandom
16 from functools
import wraps
17 from http.server
import SimpleHTTPRequestHandler, HTTPServer
19 from fake_http_server.fake_test_server
import FakeTestServerBackground
20 from sota_tools.treehub_server
import create_repo
23 logger = logging.getLogger(__name__)
28 def __init__(self, aktualizr_primary_exe, aktualizr_info_exe, id,
29 uptane_server, wait_port=9040, wait_timeout=60, log_level=1,
30 secondary=None, output_logs=True,
31 run_mode='once', director=None, image_repo=None,
32 sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
43 with open(path.join(self.
_storage_dir.name,
'secondary_config.json'),
'w+')
as secondary_config_file:
44 secondary_cfg = json.loads(Aktualizr.SECONDARY_CONFIG_TEMPLATE.
45 format(port=secondary.primary_port
if secondary
else wait_port,
46 timeout=wait_timeout))
47 json.dump(secondary_cfg, secondary_config_file)
50 with open(path.join(self.
_storage_dir.name,
'config.toml'),
'w+')
as config_file:
51 config_file.write(Aktualizr.CONFIG_TEMPLATE.format(server_url=uptane_server.base_url,
53 serial=id[1], hw_ID=id[0],
58 director=director.base_url
if director
else '',
59 image_repo=image_repo.base_url
if image_repo
else '',
60 pacman_type=
'ostree' if treehub
and sysroot
else 'fake',
61 ostree_sysroot=sysroot.path
if sysroot
else '',
62 treehub_server=treehub.base_url
if treehub
else '',
71 if sysroot
and ostree_mock_path:
72 self.
_run_env[
'LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
73 self.
_run_env[
'OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
77 server = "{server_url}"
80 base_path = "{import_path}"
81 tls_cacert_path = "ca.pem"
82 tls_pkey_path = "pkey.pem"
83 tls_clientcert_path = "client.pem"
86 primary_ecu_serial = "{serial}"
87 primary_ecu_hardware_id = "{hw_ID}"
90 path = "{storage_dir}"
92 sqldb_path = "{db_path}"
95 type = "{pacman_type}"
96 sysroot = "{ostree_sysroot}"
97 ostree_server = "{treehub_server}"
102 secondary_config_file = "{secondary_cfg_file}"
103 director_server = "{director}"
104 repo_server = "{image_repo}"
107 reboot_sentinel_dir = "{sentinel_dir}"
108 reboot_sentinel_name = "{sentinel_name}"
112 loglevel = {log_level}
116 SECONDARY_CONFIG_TEMPLATE =
'''
119 "secondaries_wait_port": {port},
120 "secondaries_wait_timeout": {timeout},
126 def add_secondary(self, secondary):
128 sec_cfg = json.load(config_file)
129 sec_cfg[
"IP"][
"secondaries"].append({
"addr":
"127.0.0.1:{}".format(secondary.port)})
131 json.dump(sec_cfg, config_file)
133 def update_wait_timeout(self, timeout):
135 sec_cfg = json.load(config_file)
136 sec_cfg[
"IP"][
"secondaries_wait_timeout"] = timeout
138 json.dump(sec_cfg, config_file)
140 def run(self, run_mode):
144 def get_info(self, retry=15):
146 for ii
in range(0, retry):
148 timeout=60, stdout=subprocess.PIPE, env=self.
_run_env)
149 if info_exe_res.returncode == 0
and \
150 str(info_exe_res.stdout).find(
'no details about installed nor pending images') != -1:
153 if info_exe_res
and info_exe_res.returncode == 0:
154 return str(info_exe_res.stdout)
156 logger.error(str(info_exe_res.stderr))
161 def is_ecu_registered(self, ecu_id):
163 if not ((device_status.find(ecu_id[0]) != -1)
and (device_status.find(ecu_id[1]) != -1)):
165 not_registered_field =
"Removed or not registered ecus:"
166 not_reg_start = device_status.find(not_registered_field)
167 return not_reg_start == -1
or (device_status.find(ecu_id[1], not_reg_start) == -1)
169 def get_current_image_info(self, ecu_id):
170 if self.
id == ecu_id:
175 def get_current_pending_image_info(self, ecu_id):
182 def _get_current_image_info(self, ecu_id, secondary_image_hash_field='installed image hash:
'):
185 ecu_serial = ecu_id[1]
186 ecu_info_position = aktualizr_status.find(ecu_serial)
187 if ecu_info_position == -1:
190 start = aktualizr_status.find(secondary_image_hash_field, ecu_info_position)
191 end = aktualizr_status.find(
'\\n', start)
192 hash_val = aktualizr_status[start + len(secondary_image_hash_field):end]
202 def get_current_primary_image_info(self):
203 primary_hash_field =
'Current primary ecu running version: '
206 start = aktualizr_status.find(primary_hash_field)
207 end = aktualizr_status.find(
'\\n', start)
208 return aktualizr_status[start + len(primary_hash_field):end]
210 logger.error(
"Failed to get aktualizr info/status")
215 def get_primary_pending_version(self):
216 primary_hash_field =
'Pending primary ecu version: '
218 start = aktualizr_status.find(primary_hash_field)
219 end = aktualizr_status.find(
'\\n', start)
220 return aktualizr_status[start + len(primary_hash_field):end]
225 stderr=
None if self.
_output_logs else subprocess.STDOUT,
228 logger.debug(
"Aktualizr has been started")
231 def __exit__(self, exc_type, exc_val, exc_tb):
234 logger.debug(
"Aktualizr has been stopped")
236 def terminate(self, sig=signal.SIGTERM):
240 return self.
_process.stdout.read().decode(errors=
'replace')
242 def wait_for_completion(self, timeout=120):
245 def wait_for_provision(self, timeout=60):
246 deadline = time.time() + timeout
247 while timeout == 0
or time.time() < deadline:
249 if info
is not None and 'Provisioned on server: yes' in info:
254 def emulate_reboot(self):
262 def copy_keys(dest_path):
264 shutil.copy(KeyStore.ca(), dest_path)
265 shutil.copy(KeyStore.pkey(), dest_path)
266 shutil.copy(KeyStore.cert(), dest_path)
270 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/ca.pem')
274 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/pkey.pem')
278 return path.join(KeyStore.base_dir,
'tests/test_data/prov_testupdate/client.pem')
283 def __init__(self, aktualizr_secondary_exe, id, port=9050, primary_port=9040,
284 sysroot=None, treehub=None, ostree_mock_path=None, **kwargs):
295 with open(path.join(self.
_storage_dir.name,
'config.toml'),
'w+')
as config_file:
296 config_file.write(IPSecondary.CONFIG_TEMPLATE.format(serial=id[1], hw_ID=id[0],
300 pacman_type=
'ostree' if treehub
and sysroot
else 'fake',
301 ostree_sysroot=sysroot.path
if sysroot
else '',
302 treehub_server=treehub.base_url
if treehub
else '',
309 if sysroot
and ostree_mock_path:
310 self.
_run_env[
'LD_PRELOAD'] = os.path.abspath(ostree_mock_path)
311 self.
_run_env[
'OSTREE_DEPLOYMENT_VERSION_FILE'] = sysroot.version_file
314 CONFIG_TEMPLATE =
'''
316 ecu_serial = "{serial}"
317 ecu_hardware_id = "{hw_ID}"
321 primary_ip = "127.0.0.1"
322 primary_port = {primary_port}
326 path = "{storage_dir}"
327 sqldb_path = "{db_path}"
330 type = "{pacman_type}"
331 sysroot = "{ostree_sysroot}"
332 ostree_server = "{treehub_server}"
336 reboot_sentinel_dir = "{sentinel_dir}"
337 reboot_sentinel_name = "{sentinel_name}"
341 def is_running(self):
342 return True if self.
_process.poll()
is None else False
346 tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
348 port = tcp.getsockname()[1]
358 logger.debug(
"IP Secondary {} has been started: {}".format(self.
id, self.
port))
361 def __exit__(self, exc_type, exc_val, exc_tb):
364 logger.debug(
"IP Secondary {} has been stopped".format(self.
id))
366 def emulate_reboot(self):
371 def __init__(self, doc_root, ifc, port, client_handler_map={}):
372 super(UptaneRepo, self).__init__(server_address=(ifc, port), RequestHandlerClass=self.
Handler)
374 self.
base_url =
'http://{}:{}'.format(self.server_address[0], self.server_address[1])
379 lambda request: (self.
Handler.handler_map.get(
'POST', {})).get(request.path,
380 self.
Handler.default_handler)(request)
383 lambda request: (self.
Handler.handler_map.get(
'PUT', {})).get(request.path,
384 self.
Handler.default_handler)(request)
387 lambda request: (self.
Handler.handler_map.get(
'GET', {})).get(request.path,
388 self.
Handler.default_get)(request)
390 for method, method_handlers
in client_handler_map.items():
391 for url, handler
in method_handlers.items():
392 if self.
Handler.handler_map.get(method,
None)
is None:
393 self.
Handler.handler_map[method] = {}
394 self.
Handler.handler_map[method][url] = handler
397 def __init__(self, request, client_address, server):
402 def default_handler(self):
403 self.send_response(200)
406 def default_get(self):
408 self.send_response(404)
411 self.send_response(200)
413 with open(self.
file_path,
'rb')
as source:
414 self.copyfile(source, self.wfile)
420 return os.path.join(self.
doc_root, self.path[1:])
423 self._server_thread = threading.Thread(target=self.serve_forever)
424 self._server_thread.start()
430 if self._server_thread:
431 self._server_thread.join(timeout=60)
432 self._server_thread =
None
437 def __exit__(self, exc_type, exc_val, exc_tb):
444 - serves signed metadata about images
445 - receives device manifest which includes installation report if any installation has happened
448 director_subdir =
"repo/director"
450 def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
451 super(DirectorRepo, self).__init__(os.path.join(uptane_repo_root, self.
director_subdir), ifc=ifc, port=port,
452 client_handler_map=client_handler_map)
459 def handle_manifest(self):
460 self.send_response(200)
464 data_size = int(self.headers[
'Content-Length'])
465 data_string = self.rfile.read(data_size)
466 json_data = json.loads(data_string)
467 except Exception
as exc:
471 install_report = json_data[
'signed'].get(
'installation_report',
"")
473 self.server.set_install_event(install_report[
'report'][
'result'][
'success'])
475 handler_map = {
'PUT': {
'/manifest': handle_manifest}}
477 def set_install_event(self, result):
478 with self._installed_condition:
479 self._last_install_res = result
480 self._installed_condition.notifyAll()
482 def wait_for_install(self, timeout=180):
483 with self._installed_condition:
484 self._installed_condition.wait(timeout=timeout)
485 return self._last_install_res
487 def get_install_result(self):
488 with self._installed_condition:
489 return self._last_install_res
494 This server serves signed metadata about images
495 as well as images by default (it's possible to serve images from another server by using the 'custom URI' feature)
498 image_subdir =
"repo/repo"
500 def __init__(self, uptane_repo_root, ifc, port, client_handler_map={}):
501 super(ImageRepo, self).__init__(os.path.join(uptane_repo_root, self.
image_subdir), ifc=ifc, port=port,
502 client_handler_map=client_handler_map)
507 This server serves images
509 image_subdir =
"repo/repo"
511 def __init__(self, root, ifc, port, client_handler_map={}):
512 super(CustomRepo, self).__init__(os.path.join(root, self.
image_subdir),
513 ifc=ifc, port=port, client_handler_map=client_handler_map)
518 This server serves requests from an ostree client, i.e. emulates/mocks the treehub server
520 def __init__(self, ifc, port, client_handler_map={}):
521 self.
root = tempfile.mkdtemp()
522 super(Treehub, self).__init__(self.
root, ifc=ifc, port=port, client_handler_map=client_handler_map)
526 return super(Treehub, self).__enter__()
528 def __exit__(self, exc_type, exc_val, exc_tb):
529 super(Treehub, self).__exit__(exc_type, exc_val, exc_tb)
530 shutil.rmtree(self.
root, ignore_errors=
True)
533 def default_get(self):
535 self.send_response(404)
543 def __init__(self, number_of_failures=1, bytes_to_send_before_interruption=10, url=''):
549 def __call__(self, request_handler):
551 request_handler.send_response(200)
552 file_size = os.path.getsize(request_handler.file_path)
553 request_handler.send_header(
'Content-Length', file_size)
554 request_handler.end_headers()
556 with open(request_handler.file_path,
'rb')
as source:
558 request_handler.wfile.write(data)
562 request_handler.default_get()
564 def map(self, url=''):
570 dummy_filez = (b
'\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06' +
571 b
'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\xa4\x00\x00\x00\x00' +
572 b
'\x00\x19\x33\x34\x32\x36\x31\xe5\x02\x00')
574 def __init__(self, number_of_failures=1, url='', fake_filez=False):
580 def __call__(self, request_handler):
582 request_handler.send_response(200)
583 request_handler.end_headers()
587 request_handler.wfile.write(b
'malformed image')
591 request_handler.default_get()
599 def __init__(self, number_of_failures=1, url=''):
604 def __call__(self, request_handler):
606 request_handler.send_response(200)
607 file_size = os.path.getsize(request_handler.file_path)
608 request_handler.end_headers()
610 with open(request_handler.file_path,
'rb')
as source:
612 data = source.read(1)
615 request_handler.wfile.write(data)
616 request_handler.wfile.flush()
622 request_handler.default_get()
629 def __init__(self, number_of_redirects=1, url=''):
634 def __call__(self, request_handler):
636 request_handler.send_response(301)
637 request_handler.send_header(
'Location', request_handler.server.base_url + request_handler.path)
638 request_handler.end_headers()
641 request_handler.default_get()
648 def __init__(self, number_of_failures=1):
652 def __call__(self, request_handler):
654 request_handler.send_response(200)
655 request_handler.end_headers()
656 request_handler.wfile.write(b
'{"non-uptane-json": "some-value"}')
660 request_handler.default_get()
667 def __init__(self, repo_manager_exe, server_port=0, director_port=0, image_repo_port=0, custom_repo_port=0):
679 def create_generic_server(self, **kwargs):
682 def create_director_repo(self, handler_map={}):
685 def create_image_repo(self, handler_map={}):
688 def create_custom_repo(self, handler_map={}):
696 def target_dir(self):
700 def target_file(self):
701 return path.join(self.
image_dir,
'targets.json')
703 def add_image(self, id, image_filename, target_name=None, image_size=1024, custom_url=''):
705 targetname = target_name
if target_name
else image_filename
707 with open(path.join(self.
image_dir, image_filename),
'wb')
as image_file:
708 image_file.write(urandom(image_size))
711 '--command',
'image',
'--filename', image_filename,
'--targetname', targetname,
'--hwid', id[0]]
714 image_creation_cmdline.append(
'--url')
715 image_creation_cmdline.append(custom_url)
717 subprocess.run(image_creation_cmdline, cwd=self.
image_dir, check=
True)
721 '--command',
'addtarget',
'--targetname', targetname,
722 '--hwid', id[0],
'--serial', id[1]], check=
True)
728 targets = json.load(target_file)
729 target_hash = targets[
"signed"][
"targets"][targetname][
"hashes"][
"sha256"]
733 def add_ostree_target(self, id, rev_hash):
735 '--command',
'image',
737 '--targetname', rev_hash,
738 '--targetsha256', rev_hash,
739 '--targetlength',
'0',
740 '--targetformat',
'OSTREE',
742 subprocess.run(image_creation_cmdline, check=
True)
745 '--command',
'addtarget',
747 '--targetname', rev_hash,
758 def __exit__(self, exc_type, exc_val, exc_tb):
759 shutil.rmtree(self.
root_dir, ignore_errors=
True)
761 def _generate_repo(self):
763 '--command',
'generate',
'--keytype',
'ED25519'], check=
True)
766 def with_aktualizr(start=True, output_logs=False, id=(
'primary-hw-ID-001', str(uuid4())), wait_timeout=60,
767 log_level=1, aktualizr_primary_exe=
'src/aktualizr_primary/aktualizr',
768 aktualizr_info_exe=
'src/aktualizr_info/aktualizr-info',
772 def wrapper(*args, ostree_mock_path=None, **kwargs):
773 aktualizr =
Aktualizr(aktualizr_primary_exe=aktualizr_primary_exe,
774 aktualizr_info_exe=aktualizr_info_exe, id=id,
775 wait_timeout=wait_timeout, log_level=log_level, output_logs=output_logs,
776 run_mode=run_mode, ostree_mock_path=ostree_mock_path, **kwargs)
779 result = test(*args, **kwargs, aktualizr=aktualizr)
781 result = test(*args, **kwargs, aktualizr=aktualizr)
789 def with_uptane_backend(start_generic_server=True, repo_manager_exe='src/uptane_generator/uptane-generator'):
792 def wrapper(*args, **kwargs):
793 repo_manager_exe_abs_path = path.abspath(repo_manager_exe)
795 if start_generic_server:
796 with repo.create_generic_server()
as uptane_server:
797 result = test(*args, **kwargs, uptane_repo=repo, uptane_server=uptane_server)
799 result = test(*args, **kwargs, uptane_repo=repo)
805 def with_director(start=True, handlers=[]):
808 def wrapper(*args, uptane_repo, **kwargs):
809 def func(handler_map={}):
810 director = uptane_repo.create_director_repo(handler_map=handler_map)
813 result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
815 result = test(*args, **kwargs, uptane_repo=uptane_repo, director=director)
818 if handlers
and len(handlers) > 0:
819 for handler
in handlers:
820 result = func(handler.map(kwargs.get(
'test_path',
'')))
830 def with_imagerepo(start=True, handlers=[]):
833 def wrapper(*args, uptane_repo, **kwargs):
834 def func(handler_map={}):
835 image_repo = uptane_repo.create_image_repo(handler_map=handler_map)
838 result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
840 result = test(*args, **kwargs, uptane_repo=uptane_repo, image_repo=image_repo)
843 if handlers
and len(handlers) > 0:
844 for handler
in handlers:
845 result = func(handler.map(kwargs.get(
'test_path',
'')))
855 def with_secondary(start=True, id=(
'secondary-hw-ID-001', str(uuid4())),
856 aktualizr_secondary_exe=
'src/aktualizr_secondary/aktualizr-secondary'):
859 def wrapper(*args, **kwargs):
860 secondary =
IPSecondary(aktualizr_secondary_exe=aktualizr_secondary_exe, id=id, **kwargs)
863 result = test(*args, **kwargs, secondary=secondary)
865 result = test(*args, **kwargs, secondary=secondary)
871 def with_path(paths):
874 def wrapper(*args, **kwargs):
875 for test_path
in paths:
876 result = test(*args, **kwargs, test_path=test_path)
885 def __init__(self, aktualizr, uptane_repo, images_to_install=[]):
888 for image
in images_to_install:
891 'filename': image[1],
892 'hash': uptane_repo.add_image(image[0], image[1], custom_url=image[2]
if len(image) > 2
else '')
895 def are_images_installed(self):
898 if not (image[
'hash'] == self.
aktualizr.get_current_image_info(image[
'ecu_id'])):
905 def with_install_manager(default_images=True):
908 def wrapper(*args, aktualizr, uptane_repo, secondary=None, images_to_install=[], **kwargs):
909 if default_images
and (
not images_to_install
or len(images_to_install) == 0):
910 images_to_install = [(aktualizr.id,
'primary-image.img')]
912 images_to_install.append((secondary.id,
'secondary-image.img'))
913 install_mngr =
InstallManager(aktualizr, uptane_repo, images_to_install)
914 result = test(*args, **kwargs, aktualizr=aktualizr, secondary=secondary,
915 uptane_repo=uptane_repo, install_mngr=install_mngr)
921 def with_images(images_to_install):
924 def wrapper(*args, **kwargs):
925 return test(*args, **kwargs, images_to_install=images_to_install)
930 def with_customrepo(start=True, handlers=[]):
933 def wrapper(*args, uptane_repo, **kwargs):
934 def func(handler_map={}):
935 custom_repo = uptane_repo.create_custom_repo(handler_map=handler_map)
938 result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
940 result = test(*args, **kwargs, uptane_repo=uptane_repo, custom_repo=custom_repo)
943 if handlers
and len(handlers) > 0:
944 for handler
in handlers:
945 result = func(handler.map(kwargs.get(
'test_path',
'')))
956 repo_path =
'ostree_repo'
959 self.
_root = tempfile.mkdtemp()
963 subprocess.run([
'cp',
'-r', self.
repo_path, self.
_root], check=
True)
967 version_file.writelines([
'{}\n'.format(initial_revision),
969 '{}\n'.format(initial_revision),
974 def __exit__(self, exc_type, exc_val, exc_tb):
975 shutil.rmtree(self.
_root, ignore_errors=
True)
977 def get_revision(self):
978 rev_cmd_res = subprocess.run([
'ostree',
'rev-parse',
'--repo', self.
path +
'/ostree/repo',
'generate-remote:generated'],
979 timeout=60, check=
True, stdout=subprocess.PIPE)
981 return rev_cmd_res.stdout.decode(
'ascii').rstrip(
'\n')
983 def update_revision(self, rev):
985 version_file.writelines([
'{}\n'.format(rev),
991 def with_sysroot(ostree_mock_path='tests/libostree_mock.so'):
994 def wrapper(*args, **kwargs):
996 return test(*args, **kwargs, sysroot=sysroot, ostree_mock_path=ostree_mock_path)
1001 def with_treehub(handlers=[], port=0):
1002 def decorator(test):
1004 def wrapper(*args, **kwargs):
1005 def func(handler_map={}):
1006 with Treehub(
'localhost', port=port, client_handler_map=handler_map)
as treehub:
1007 return test(*args, **kwargs, treehub=treehub)
1009 if handlers
and len(handlers) > 0:
1010 for handler
in handlers:
1011 result = func(handler.map(kwargs.get(
'test_path',
'')))