Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
test_customrepo_failure.py
1 #!/usr/bin/env python3
2 
3 import logging
4 import argparse
5 
6 from os import getcwd, chdir
7 
8 from test_fixtures import with_aktualizr, with_uptane_backend, KeyStore, with_secondary, with_path,\
9  DownloadInterruptionHandler, MalformedJsonHandler, with_director, with_imagerepo, InstallManager,\
10  with_install_manager, with_images, MalformedImageHandler, with_customrepo, SlowRetrievalHandler, \
11  RedirectHandler, with_sysroot, with_treehub, TestRunner
12 
13 
14 logger = logging.getLogger(__file__)
15 
16 
17 """
18  Verifies whether aktualizr is updatable after malformed image is downloaded
19  from a custom image server with follow-up successful download.
20 """
21 @with_uptane_backend(start_generic_server=True)
22 @with_customrepo(handlers=[
23  DownloadInterruptionHandler(number_of_failures=1, url='/primary-image.img'),
24  MalformedImageHandler(number_of_failures=1, url='/primary-image.img')
25  # TODO: this test fails too, although httpclient.cc sets
26  # CURLOPT_LOW_SPEED_TIME and CURLOPT_LOW_SPEED_TIME
27  # https://saeljira.it.here.com/browse/OTA-3737
28  #SlowRetrievalHandler(url='/primary-image.img')
29  ])
30 @with_imagerepo()
31 @with_director(start=False)
32 @with_aktualizr(start=False, run_mode='full')
33 def test_customrepo_update_after_image_download_failure(uptane_repo, custom_repo, director,
34  aktualizr, **kwargs):
35  update_hash = uptane_repo.add_image(aktualizr.id, 'primary-image.img',
36  custom_url=custom_repo.base_url + '/' + 'primary-image.img')
37 
38  with aktualizr:
39  with director:
40  install_result = director.wait_for_install()
41 
42  return install_result and update_hash == aktualizr.get_current_image_info(aktualizr.id)
43 
44 
45 """
46  Verifies if aktualizr supports redirects - update is successful after redirect
47  Note: should aktualizr support unlimited number of redirects
48 """
49 @with_uptane_backend(start_generic_server=True)
50 # TODO: Limit a number of HTTP redirects within a single request processing
51 # https://saeljira.it.here.com/browse/OTA-3729
52 @with_customrepo(handlers=[
53  RedirectHandler(number_of_redirects=10, url='/primary-image.img')
54  ])
55 @with_imagerepo()
56 @with_director()
57 @with_aktualizr(run_mode='once', output_logs=True)
58 def test_customrepo_update_redirect(aktualizr, uptane_repo,
59  custom_repo, director, **kwargs):
60  update_hash = uptane_repo.add_image(aktualizr.id, 'primary-image.img',
61  custom_url=custom_repo.base_url + '/' + 'primary-image.img')
62  install_result = director.wait_for_install()
63  return install_result and update_hash == aktualizr.get_current_image_info(aktualizr.id)
64 
65 """
66  Verifies if aktualizr rejects redirects over 10 times - update fails after redirect
67 """
68 @with_uptane_backend(start_generic_server=True)
69 @with_customrepo(handlers=[
70  RedirectHandler(number_of_redirects=(11 * 3 + 1), url='/primary-image.img')
71  ])
72 @with_imagerepo()
73 @with_director()
74 @with_aktualizr(start=False, run_mode='once', output_logs=True)
75 def test_customrepo_unsuccessful_update_redirect(aktualizr, uptane_repo,
76  custom_repo, director, **kwargs):
77  update_hash = uptane_repo.add_image(aktualizr.id, 'primary-image.img',
78  custom_url=custom_repo.base_url + '/' + 'primary-image.img')
79  with aktualizr:
80  aktualizr.wait_for_completion()
81 
82  return not director.get_install_result()
83 
84 
85 if __name__ == "__main__":
86  logging.basicConfig(level=logging.INFO)
87 
88  parser = argparse.ArgumentParser(description='Test backend failure')
89  parser.add_argument('-b', '--build-dir', help='build directory', default='build')
90  parser.add_argument('-s', '--src-dir', help='source directory', default='.')
91  input_params = parser.parse_args()
92 
93  KeyStore.base_dir = input_params.src_dir
94  initial_cwd = getcwd()
95  chdir(input_params.build_dir)
96 
97  test_suite = [
98  test_customrepo_update_after_image_download_failure,
99  test_customrepo_update_redirect,
100  test_customrepo_unsuccessful_update_redirect,
101  ]
102 
103  test_suite_run_result = TestRunner(test_suite).run()
104 
105  chdir(initial_cwd)
106  exit(0 if test_suite_run_result else 1)