Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
device_cred_prov_hsm_test.py
1 #!/usr/bin/env python3
2 
3 import argparse
4 import os
5 import re
6 import sys
7 import shutil
8 import tempfile
9 
10 from pathlib import Path
11 
12 from prov_test_common import run_subprocess, verify_provisioned
13 
14 
15 def main():
16  parser = argparse.ArgumentParser(description='Run a local device credential provisioning test using an simulated HSM with aktualizr')
17  parser.add_argument('--build-dir', '-b', type=Path, default=Path('../build'), help='build directory')
18  parser.add_argument('--src-dir', '-s', type=Path, default=Path('../'), help='source directory (parent of src/)')
19  parser.add_argument('--credentials', '-c', type=Path, default=Path('.'), help='path to credentials archive')
20  parser.add_argument('--pkcs11-module', '-p', type=Path, default=Path('/usr/lib/softhsm/libsofthsm2.so'), help='path to PKCS#11 library module')
21  args = parser.parse_args()
22 
23  retval = 1
24  with tempfile.TemporaryDirectory() as tmp_dir:
25  retval = provision(Path(tmp_dir), args.build_dir, args.src_dir,
26  args.credentials, args.pkcs11_module)
27  return retval
28 
29 
30 CONFIG_TEMPLATE = '''
31 [pacman]
32 type = "none"
33 
34 [tls]
35 server_url_path = "{tmp_dir}/gateway.url"
36 cert_source = "pkcs11"
37 pkey_source = "pkcs11"
38 
39 [p11]
40 module = "{pkcs11_module}"
41 pass = "1234"
42 uptane_key_id = "03"
43 tls_clientcert_id = "01"
44 tls_pkey_id = "02"
45 
46 [uptane]
47 key_source = "pkcs11"
48 
49 [storage]
50 type = "sqlite"
51 path = "{tmp_dir}"
52 
53 [import]
54 base_path = "{tmp_dir}/import"
55 tls_cacert_path = "root.crt"
56 tls_clientcert_path = "client.pem"
57 tls_pkey_path = "pkey.pem"
58 '''
59 
60 
61 def provision(tmp_dir, build_dir, src_dir, creds, pkcs11_module):
62  conf_dir = tmp_dir / 'conf.d'
63  os.mkdir(str(conf_dir))
64  conf_prov = conf_dir / '20-device-cred-prov-hsm.toml'
65  with conf_prov.open('w') as f:
66  f.write(CONFIG_TEMPLATE.format(tmp_dir=tmp_dir, pkcs11_module=pkcs11_module))
67  akt = build_dir / 'src/aktualizr_primary/aktualizr'
68  akt_info = build_dir / 'src/aktualizr_info/aktualizr-info'
69  akt_cp = build_dir / 'src/cert_provider/aktualizr-cert-provider'
70  setup_hsm = src_dir / 'scripts/export_to_hsm.sh'
71  hsm_conf = tmp_dir / 'softhsm2.conf'
72  token_dir = tmp_dir / 'token'
73  certs_dir = tmp_dir / 'import'
74 
75 
76  os.environ['TOKEN_DIR'] = str(token_dir)
77  os.environ['SOFTHSM2_CONF'] = str(hsm_conf)
78  os.environ['CERTS_DIR'] = str(certs_dir)
79  shutil.copyfile(str(src_dir / "tests/test_data/softhsm2.conf"), str(hsm_conf))
80 
81  akt_input = [str(akt), '--config', str(conf_dir), '--loglevel', '0', '--run-mode', 'once']
82  run_subprocess(akt_input)
83  # Verify that device has NOT yet provisioned.
84  stdout, stderr, retcode = run_subprocess([str(akt_info), '--config', str(conf_dir)])
85  if (b'Couldn\'t load device ID' not in stdout or
86  b'Couldn\'t load ECU serials' not in stdout or
87  b'Provisioned on server: no' not in stdout or
88  b'Fetched metadata: no' not in stdout):
89  print('Error: aktualizr failure or device already provisioned: \n' + stderr.decode() + stdout.decode())
90  return 1
91 
92  # Unlike in meta-updater's oe-selftest, don't check if the HSM is already
93  # initialized. If setup_hsm.sh was already run, it *should* be initialized.
94  # If we run it from this script below, the script will reset where the
95  # tokens are stored, and until that happens, we would be checking the wrong
96  # directory.
97 
98  # Run aktualizr-cert-provider.
99  print('Device has not yet provisioned (as expected). Running aktualizr-cert-provider.')
100  stdout, stderr, retcode = run_subprocess([str(akt_cp),
101  '-c', str(creds), '-l', '/', '-r', '-s', '-u', '-g', str(conf_prov)])
102  if retcode > 0:
103  print('aktualizr-cert-provider failed (' + str(retcode) + '): ' +
104  stderr.decode() + stdout.decode())
105  return retcode
106  stdout, stderr, retcode = run_subprocess([str(setup_hsm)])
107  if retcode > 0:
108  print('setup_hsm.sh failed: ' + stdout.decode() + stderr.decode())
109  return 1
110 
111  # Verify that HSM is able to initialize.
112  pkcs11_command = ['pkcs11-tool', '--module=' + str(pkcs11_module), '-O',
113  '--login', '--pin', '1234']
114  softhsm2_command = ['softhsm2-util', '--show-slots']
115 
116  p11_out, p11_err, p11_ret = run_subprocess(pkcs11_command)
117  hsm_out, hsm_err, hsm_ret = run_subprocess(softhsm2_command)
118 
119  if not (p11_ret == 0 and hsm_ret == 0 and
120  b'present token' in p11_err and
121  b'X.509 cert' in p11_out and
122  b'Initialized: yes' in hsm_out and
123  b'User PIN init.: yes' in hsm_out and
124  hsm_err == b''):
125  print('pkcs11-tool or softhsm2-tool failed: ' + p11_err.decode() +
126  p11_out.decode() + hsm_err.decode() + hsm_out.decode())
127  return 1
128 
129  # Check that pkcs11 output matches sofhsm output.
130  p11_p = re.compile(r'Using slot [0-9] with a present token \((0x[0-9a-f]*)\)\s')
131  p11_m = p11_p.search(p11_err.decode())
132  if not p11_m or p11_m.lastindex <= 0:
133  print('Slot number not found with pkcs11-tool: ' + p11_err.decode() + p11_out.decode())
134  return 1
135  hsm_p = re.compile(r'Description:\s*SoftHSM slot ID (0x[0-9a-f]*)\s')
136  hsm_m = hsm_p.search(hsm_out.decode())
137  if not hsm_m or hsm_m.lastindex <= 0:
138  print('Slot number not found with softhsm2-tool: ' + hsm_err.decode() + hsm_out.decode())
139  return 1
140  if p11_m.group(1) != hsm_m.group(1):
141  print('Slot number does not match: ' + p11_err.decode() + p11_out.decode() +
142  hsm_err.decode() + hsm_out.decode())
143  return 1
144 
145  (tmp_dir / 'sql.db').unlink()
146  run_subprocess(akt_input)
147  return verify_provisioned(akt_info, conf_dir)
148 
149 
150 if __name__ == '__main__':
151  sys.exit(main())