10 from pathlib
import Path
12 from prov_test_common
import run_subprocess, verify_provisioned
16 parser = argparse.ArgumentParser(description=
'Run a local device credential provisioning test using an simulated HSM with aktualizr')
17 parser.add_argument(
'--build-dir',
'-b', type=Path, default=Path(
'../build'), help=
'build directory')
18 parser.add_argument(
'--src-dir',
'-s', type=Path, default=Path(
'../'), help=
'source directory (parent of src/)')
19 parser.add_argument(
'--credentials',
'-c', type=Path, default=Path(
'.'), help=
'path to credentials archive')
20 parser.add_argument(
'--pkcs11-module',
'-p', type=Path, default=Path(
'/usr/lib/softhsm/libsofthsm2.so'), help=
'path to PKCS#11 library module')
21 args = parser.parse_args()
24 with tempfile.TemporaryDirectory()
as tmp_dir:
25 retval = provision(Path(tmp_dir), args.build_dir, args.src_dir,
26 args.credentials, args.pkcs11_module)
35 server_url_path = "{tmp_dir}/gateway.url"
36 cert_source = "pkcs11"
37 pkey_source = "pkcs11"
40 module = "{pkcs11_module}"
43 tls_clientcert_id = "01"
54 base_path = "{tmp_dir}/import"
55 tls_cacert_path = "root.crt"
56 tls_clientcert_path = "client.pem"
57 tls_pkey_path = "pkey.pem"
61 def provision(tmp_dir, build_dir, src_dir, creds, pkcs11_module):
62 conf_dir = tmp_dir /
'conf.d'
63 os.mkdir(str(conf_dir))
64 conf_prov = conf_dir /
'20-device-cred-prov-hsm.toml'
65 with conf_prov.open(
'w')
as f:
66 f.write(CONFIG_TEMPLATE.format(tmp_dir=tmp_dir, pkcs11_module=pkcs11_module))
67 akt = build_dir /
'src/aktualizr_primary/aktualizr'
68 akt_info = build_dir /
'src/aktualizr_info/aktualizr-info'
69 akt_cp = build_dir /
'src/cert_provider/aktualizr-cert-provider'
70 setup_hsm = src_dir /
'scripts/export_to_hsm.sh'
71 hsm_conf = tmp_dir /
'softhsm2.conf'
72 token_dir = tmp_dir /
'token'
73 certs_dir = tmp_dir /
'import'
76 os.environ[
'TOKEN_DIR'] = str(token_dir)
77 os.environ[
'SOFTHSM2_CONF'] = str(hsm_conf)
78 os.environ[
'CERTS_DIR'] = str(certs_dir)
79 shutil.copyfile(str(src_dir /
"tests/test_data/softhsm2.conf"), str(hsm_conf))
81 akt_input = [str(akt),
'--config', str(conf_dir),
'--loglevel',
'0',
'--run-mode',
'once']
82 run_subprocess(akt_input)
84 stdout, stderr, retcode = run_subprocess([str(akt_info),
'--config', str(conf_dir)])
85 if (b
'Couldn\'t load device ID' not in stdout
or
86 b
'Couldn\'t load ECU serials' not in stdout
or
87 b
'Provisioned on server: no' not in stdout
or
88 b
'Fetched metadata: no' not in stdout):
89 print(
'Error: aktualizr failure or device already provisioned: \n' + stderr.decode() + stdout.decode())
99 print(
'Device has not yet provisioned (as expected). Running aktualizr-cert-provider.')
100 stdout, stderr, retcode = run_subprocess([str(akt_cp),
101 '-c', str(creds),
'-l',
'/',
'-r',
'-s',
'-u',
'-g', str(conf_prov)])
103 print(
'aktualizr-cert-provider failed (' + str(retcode) +
'): ' +
104 stderr.decode() + stdout.decode())
106 stdout, stderr, retcode = run_subprocess([str(setup_hsm)])
108 print(
'setup_hsm.sh failed: ' + stdout.decode() + stderr.decode())
112 pkcs11_command = [
'pkcs11-tool',
'--module=' + str(pkcs11_module),
'-O',
113 '--login',
'--pin',
'1234']
114 softhsm2_command = [
'softhsm2-util',
'--show-slots']
116 p11_out, p11_err, p11_ret = run_subprocess(pkcs11_command)
117 hsm_out, hsm_err, hsm_ret = run_subprocess(softhsm2_command)
119 if not (p11_ret == 0
and hsm_ret == 0
and
120 b
'present token' in p11_err
and
121 b
'X.509 cert' in p11_out
and
122 b
'Initialized: yes' in hsm_out
and
123 b
'User PIN init.: yes' in hsm_out
and
125 print(
'pkcs11-tool or softhsm2-tool failed: ' + p11_err.decode() +
126 p11_out.decode() + hsm_err.decode() + hsm_out.decode())
130 p11_p = re.compile(
r'Using slot [0-9] with a present token \((0x[0-9a-f]*)\)\s')
131 p11_m = p11_p.search(p11_err.decode())
132 if not p11_m
or p11_m.lastindex <= 0:
133 print(
'Slot number not found with pkcs11-tool: ' + p11_err.decode() + p11_out.decode())
135 hsm_p = re.compile(
r'Description:\s*SoftHSM slot ID (0x[0-9a-f]*)\s')
136 hsm_m = hsm_p.search(hsm_out.decode())
137 if not hsm_m
or hsm_m.lastindex <= 0:
138 print(
'Slot number not found with softhsm2-tool: ' + hsm_err.decode() + hsm_out.decode())
140 if p11_m.group(1) != hsm_m.group(1):
141 print(
'Slot number does not match: ' + p11_err.decode() + p11_out.decode() +
142 hsm_err.decode() + hsm_out.decode())
145 (tmp_dir /
'sql.db').unlink()
146 run_subprocess(akt_input)
147 return verify_provisioned(akt_info, conf_dir)
150 if __name__ ==
'__main__':