|
1 | | -# Borrowed from https://github.com/cnabio/signy/blob/afba301697df456b363790dc16483408b626a8af/scripts/in-toto/keys.py |
2 | | -# TODO: |
3 | | -# * Make a storage/provider-agnostic (e.g., filesystem, HSM) key management API, like securesystemslib.storage. |
4 | | - |
5 | 1 | # Imports. |
6 | 2 |
|
7 | | -# 1st-party. |
8 | | -import os |
9 | | -import shutil |
10 | | - |
11 | 3 | # 2nd-party. |
12 | | -from typing import Any, Dict, List, Optional |
| 4 | + |
| 5 | +from abc import ABC, abstractmethod |
| 6 | +from enum import Enum, unique |
| 7 | +from typing import Any, List, Optional |
| 8 | + |
| 9 | +import logging |
| 10 | +import os |
13 | 11 |
|
14 | 12 | # 3rd-party. |
15 | 13 | from securesystemslib.interface import ( |
16 | | - generate_and_write_ed25519_keypair, |
17 | | - get_password, |
| 14 | + import_ecdsa_privatekey_from_file, |
18 | 15 | import_ed25519_privatekey_from_file, |
19 | | - import_ed25519_publickey_from_file, |
| 16 | + import_rsa_privatekey_from_file, |
| 17 | +) |
| 18 | +from securesystemslib.keys import ( |
| 19 | + create_signature, |
| 20 | + verify_signature, |
20 | 21 | ) |
21 | 22 |
|
22 | | -# Utility classes. |
| 23 | +# Generic classes. |
23 | 24 |
|
24 | | -class Threshold: |
| 25 | +@unique |
| 26 | +class Algorithm(Enum): |
| 27 | + ECDSA = import_ecdsa_privatekey_from_file |
| 28 | + ED25519 = import_ed25519_privatekey_from_file |
| 29 | + RSA = import_rsa_privatekey_from_file |
25 | 30 |
|
26 | | - def __init__(self, m: int = 1, n: int = 1): |
27 | | - assert m > 0, f'{m} <= 0' |
28 | | - assert n > 0, f'{n} <= 0' |
29 | | - assert m <= n, f'{m} > {n}' |
30 | | - self.m = m |
31 | | - self.n = n |
| 31 | +class Threshold: |
32 | 32 |
|
33 | | -class Keypath: |
| 33 | + def __init__(self, min_: int = 1, max_: int = 1): |
| 34 | + assert min_ > 0, f'{min_} <= 0' |
| 35 | + assert max_ > 0, f'{max_} <= 0' |
| 36 | + assert min_ <= max_, f'{min_} > {max_}' |
| 37 | + self.min = min_ |
| 38 | + self.max = max_ |
34 | 39 |
|
35 | | - def __init__(self, private: str, public: str): |
36 | | - assert os.path.isfile(private), private |
37 | | - assert os.path.isfile(public), public |
38 | | - self.private = private |
39 | | - self.public = public |
| 40 | +class Key(ABC): |
40 | 41 |
|
41 | | -class Key: |
| 42 | + @abstractmethod |
| 43 | + def __init__(self) -> None: |
| 44 | + raise NotImplementedError() |
42 | 45 |
|
43 | | - def __init__(self, path: str, obj: Any): |
44 | | - self.path = path |
45 | | - self.obj = obj |
| 46 | + @property |
| 47 | + @abstractmethod |
| 48 | + def keyid(self) -> str: |
| 49 | + raise NotImplementedError() |
46 | 50 |
|
47 | | -class Keypair: |
| 51 | + @abstractmethod |
| 52 | + def sign(self, signed: str) -> str: |
| 53 | + raise NotImplementedError() |
48 | 54 |
|
49 | | - def __init__(self, private: Key, public: Key): |
50 | | - self.private = private |
51 | | - self.public = public |
| 55 | + @abstractmethod |
| 56 | + def verify(self, signed: str, signature: str) -> bool: |
| 57 | + raise NotImplementedError() |
52 | 58 |
|
53 | | -Keypairs = List[Keypair] |
| 59 | +Keys = List[Key] |
54 | 60 |
|
55 | | -class Keyring: |
| 61 | +class KeyRing: |
56 | 62 |
|
57 | | - def __init__(self, threshold: Threshold, keypairs: Keypairs): |
58 | | - if len(keypairs) >= threshold.m: |
59 | | - logging.warning(f'{len(keypairs)} >= {threshold.m}') |
60 | | - if len(keypairs) <= threshold.n: |
61 | | - logging.warning(f'{len(keypairs)} <= {threshold.n}') |
| 63 | + def __init__(self, threshold: Threshold, keys: Keys): |
| 64 | + if len(keys) >= threshold.min: |
| 65 | + logging.warning(f'{len(keys)} >= {threshold.min}') |
| 66 | + if len(keys) <= threshold.max: |
| 67 | + logging.warning(f'{len(keys)} <= {threshold.max}') |
62 | 68 | self.threshold = threshold |
63 | | - self.keypairs = keypairs |
| 69 | + self.keys = keys |
| 70 | + |
| 71 | +# Specific types of keys, such as those in RAM, or on HSMs (TODO). |
| 72 | + |
| 73 | +class RAMKey(Key): |
| 74 | + |
| 75 | + def __init__(self, obj: Any) -> None: # pylint: disable=super-init-not-called |
| 76 | + self.__obj = obj |
| 77 | + |
| 78 | + def keyid(self) -> str: |
| 79 | + return self.__obj['keyid'] |
| 80 | + |
| 81 | + def sign(self, signed: str) -> str: |
| 82 | + return create_signature(self.__obj, signed) |
| 83 | + |
| 84 | + def verify(self, signed: str, signature: str) -> bool: |
| 85 | + return verify_signature(self.__obj, signature, signed) |
64 | 86 |
|
65 | | -# Useful for securesytemslib. |
66 | | -KeyDict = Dict[str, Any] |
67 | 87 |
|
68 | 88 | # Utility functions. |
69 | 89 |
|
70 | | -def get_new_private_keypath(keystore_dir: str, rolename: str, i : int = 1) -> str: |
71 | | - return os.path.join(keystore_dir, f'{rolename}_ed25519_key_{i}') |
72 | | - |
73 | | -def get_public_keypath(private_keypath: str) -> str: |
74 | | - # this is the tuf filename convention at the time of writing. |
75 | | - return f'{private_keypath}.pub' |
76 | | - |
77 | | -def get_private_keys_from_keyring(keyring: Keyring) -> KeyDict: |
78 | | - privkeys = {} |
79 | | - |
80 | | - for keypair in keyring.keypairs: |
81 | | - privkey = keypair.private.obj |
82 | | - keyid = privkey['keyid'] |
83 | | - assert keyid not in privkeys |
84 | | - privkeys[keyid] = privkey |
85 | | - |
86 | | - return privkeys |
87 | | - |
88 | | -def get_public_keys_from_keyring(keyring: Keyring) -> KeyDict: |
89 | | - pubkeys = {} |
90 | | - |
91 | | - for keypair in keyring.keypairs: |
92 | | - pubkey = keypair.public.obj |
93 | | - keyid = pubkey['keyid'] |
94 | | - assert keyid not in pubkeys |
95 | | - pubkeys[keyid] = pubkey |
96 | | - |
97 | | - return pubkeys |
98 | | - |
99 | | -def write_keypair(keystore_dir: str, rolename: str, i: int = 1, n: int = 1, passphrase: Optional[str] = None) -> Keypath: |
100 | | - private_keypath = get_new_private_keypath(keystore_dir, rolename, i) |
101 | | - assert not os.path.isfile(private_keypath) |
102 | | - public_keypath = get_public_keypath(private_keypath) |
103 | | - assert not os.path.isfile(public_keypath) |
104 | | - |
105 | | - # Make the keystore directory, WR-only by self, if not already there. |
106 | | - os.makedirs(keystore_dir, mode=0o700, exist_ok=True) |
107 | | - |
108 | | - # FIXME: do not assume Ed25519 |
109 | | - generate_and_write_ed25519_keypair(private_keypath, password=passphrase) |
110 | | - |
111 | | - return Keypath(private_keypath, public_keypath) |
112 | | - |
113 | | -def read_keypair(keypath: Keypath, passphrase: Optional[str] = None) -> Keypair: |
114 | | - private_keypath = keypath.private |
115 | | - private_key_obj = import_ed25519_privatekey_from_file(keypath.private, password=passphrase) |
116 | | - private_key = Key(private_keypath, private_key_obj) |
117 | | - |
118 | | - # and its corresponding public key. |
119 | | - public_keypath = keypath.public |
120 | | - public_key_obj = import_ed25519_publickey_from_file(keypath.public) |
121 | | - public_key = Key(public_keypath, public_key_obj) |
122 | | - |
123 | | - return Keypair(private_key, public_key) |
124 | | - |
125 | | -def rename_keys_to_match_keyid(keystore_dir: str, keypair: Keypair) -> None: |
126 | | - ''' |
127 | | - <Purpose> |
128 | | - Rename public / private keys to match their keyid, so that it is easy |
129 | | - to later find public keys on the repository, or private keys on disk. |
130 | | - Also see https://github.com/theupdateframework/tuf/issues/573 |
131 | | - ''' |
132 | | - |
133 | | - keyid = keypair.public.obj['keyid'] |
134 | | - |
135 | | - # Rename the private key filename to match the keyid. |
136 | | - assert os.path.exists(keystore_dir), keystore_dir |
137 | | - new_private_keypath = os.path.join(keystore_dir, keyid) |
138 | | - # Move the key to the new filename. |
139 | | - assert not os.path.isfile(new_private_keypath), new_private_keypath |
140 | | - shutil.move(keypair.private.path, new_private_keypath) |
141 | | - # Update the path to the key. |
142 | | - keypair.private.path = new_private_keypath |
143 | | - |
144 | | - # Rename the public key filename to match the keyid. |
145 | | - new_public_keypath = get_public_keypath(new_private_keypath) |
146 | | - # Move the key to the new filename. |
147 | | - assert not os.path.isfile(new_public_keypath), new_public_keypath |
148 | | - shutil.move(keypair.public.path, new_public_keypath) |
149 | | - # Update the path to the key. |
150 | | - keypair.public.path = new_public_keypath |
151 | | - |
152 | | -def write_and_read_new_keys(keystore_dir: str, rolename: str, threshold: Threshold) -> Keyring: |
153 | | - keypairs = [] |
154 | | - |
155 | | - for i in range(1, threshold.n + 1): |
156 | | - print(f'Writing key {i}/{threshold.n} for the "{rolename}" rolename...') |
157 | | - passphrase = get_password( |
158 | | - prompt='Please enter a NON-EMPTY passphrase to ENCRYPT this key: ', |
159 | | - confirm=True |
160 | | - ) |
161 | | - keypath = write_keypair(keystore_dir, rolename, i, threshold.n, passphrase) |
162 | | - keypair = read_keypair(keypath, passphrase) |
163 | | - # Rename the private and public keys to match the keyid instead. |
164 | | - # Why? So that we know how to find keys later on repository / disk. |
165 | | - rename_keys_to_match_keyid(keystore_dir, keypair) |
166 | | - keypairs.append(keypair) |
167 | | - print() |
168 | | - |
169 | | - return Keyring(threshold, tuple(keypairs)) |
| 90 | +def read_key(filename: str, algorithm: str, passphrase: Optional[str] = None) -> Key: |
| 91 | + handler = Algorithm[algorithm] |
| 92 | + obj = handler(filename, password=passphrase) |
| 93 | + return RAMKey(obj) |
0 commit comments