From 95ce03befa73fa3fbfc0d7ec4e59db3058908fb9 Mon Sep 17 00:00:00 2001 From: Renata Date: Tue, 17 Dec 2024 16:01:32 -0500 Subject: [PATCH] feat: initial work on storing names of keys in metadata files --- taf/api/repository.py | 4 +-- taf/api/roles.py | 4 +-- taf/keys.py | 74 +++++++++++++++++++++++++------------------ taf/tuf/repository.py | 4 ++- taf/yubikey.py | 15 +++++++-- 5 files changed, 64 insertions(+), 37 deletions(-) diff --git a/taf/api/repository.py b/taf/api/repository.py index 8524cfe8..15c7c3b5 100644 --- a/taf/api/repository.py +++ b/taf/api/repository.py @@ -74,7 +74,7 @@ def create_repository( roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData) auth_repo = AuthenticationRepository(path=path) - signers, verification_keys = load_sorted_keys_of_new_roles( + signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles( roles=roles_keys_data.roles, yubikeys_data=roles_keys_data.yubikeys, keystore=keystore, @@ -85,7 +85,7 @@ def create_repository( return repository = AuthenticationRepository(path=path) - repository.create(roles_keys_data, signers, verification_keys) + repository.create(roles_keys_data, signers, verification_keys, key_name_mappings) if commit: auth_repo.init_repo() commit_msg = git_commit_message("create-repo") diff --git a/taf/api/roles.py b/taf/api/roles.py index a3681975..ad8d2274 100644 --- a/taf/api/roles.py +++ b/taf/api/roles.py @@ -128,7 +128,7 @@ def add_role( new_role.threshold = threshold new_role.yubikey = yubikey - signers, _ = load_sorted_keys_of_new_roles( + signers, _, key_name_mappings = load_sorted_keys_of_new_roles( roles=new_role, yubikeys_data=None, keystore=keystore_path, @@ -291,7 +291,7 @@ def add_multiple_roles( ): all_signers = {} for role_to_add_data in roles_to_add_data: - signers, _ = load_sorted_keys_of_new_roles( + signers, _, key_name_mappings = load_sorted_keys_of_new_roles( roles=role_to_add_data, yubikeys_data=None, keystore=keystore_path, diff --git a/taf/keys.py b/taf/keys.py index 71db110a..d6d86c8c 100644 --- a/taf/keys.py +++ b/taf/keys.py @@ -13,6 +13,7 @@ from taf.api.utils._conf import find_keystore from taf.tuf.keys import ( YkSigner, + _get_legacy_keyid, generate_and_write_rsa_keypair, generate_rsa_keypair, get_sslib_key_from_value, @@ -140,22 +141,25 @@ def _sort_roles(roles): keystore_roles, yubikey_roles = _sort_roles(roles) signers: Dict = {} verification_keys: Dict = {} + keys_name_mappings: Dict = {} + for role in keystore_roles: if role.name in existing_roles: continue - keystore_signers, _, _ = setup_roles_keys( + keystore_signers, _, _, key_name_mapping = setup_roles_keys( role, keystore=keystore, skip_prompt=skip_prompt, ) for signer in keystore_signers: signers.setdefault(role.name, []).append(signer) + keys_name_mappings.update(key_name_mapping) for role in yubikey_roles: if role.name in existing_roles: continue - _, yubikey_keys, yubikey_signers = setup_roles_keys( + _, yubikey_keys, yubikey_signers, key_name_mapping = setup_roles_keys( role, certs_dir=certs_dir, yubikeys=yubikeys, @@ -164,7 +168,8 @@ def _sort_roles(roles): ) verification_keys[role.name] = yubikey_keys signers[role.name] = yubikey_signers - return signers, verification_keys + keys_name_mappings.update(key_name_mapping) + return signers, verification_keys, keys_name_mappings except KeystoreError: raise SigningError("Could not load keys of new roles") @@ -331,11 +336,13 @@ def setup_roles_keys( users_yubikeys_details = {} is_yubikey = bool(yubikey_ids) + keys_name_mappings: Dict = {} if is_yubikey: - yubikey_keys, yubikey_signers = _setup_yubikey_roles_keys( + yubikey_keys, yubikey_signers, keys_name_mapping = _setup_yubikey_roles_keys( yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size ) + keys_name_mappings.update(keys_name_mapping) else: if keystore is None: taf_logger.error("No keystore provided and no default keystore found") @@ -343,7 +350,7 @@ def setup_roles_keys( default_params = RoleSetupParams() for key_num in range(role.number): key_name = get_key_name(role.name, key_num, role.number) - signer = _setup_keystore_key( + signer, key_id = _setup_keystore_key( keystore, role.name, key_name, @@ -353,7 +360,9 @@ def setup_roles_keys( skip_prompt=skip_prompt, ) keystore_signers.append(signer) - return keystore_signers, yubikey_keys, yubikey_signers + keys_name_mappings[key_id] = key_name + + return keystore_signers, yubikey_keys, yubikey_signers, keys_name_mappings def _setup_yubikey_roles_keys( @@ -363,56 +372,58 @@ def _setup_yubikey_roles_keys( yk_with_public_key = {} yubikey_keys = [] signers = [] - for key_id in yubikey_ids: - + keyid_name_mapping = {} + for key_name in yubikey_ids: public_key_text = None - if key_id in users_yubikeys_details: - public_key_text = users_yubikeys_details[key_id].public + if key_name in users_yubikeys_details: + public_key_text = users_yubikeys_details[key_name].public if public_key_text: - scheme = users_yubikeys_details[key_id].scheme + scheme = users_yubikeys_details[key_name].scheme public_key = get_sslib_key_from_value(public_key_text, scheme) # Check if the signing key is already loaded - if not yk.get_key_serial_by_id(key_id): - yk_with_public_key[key_id] = public_key + if not yk.get_key_serial_by_id(key_name): + yk_with_public_key[key_name] = public_key else: loaded_keys_num += 1 yubikey_keys.append(public_key) else: key_scheme = None - if key_id in users_yubikeys_details: - key_scheme = users_yubikeys_details[key_id].scheme + if key_name in users_yubikeys_details: + key_scheme = users_yubikeys_details[key_name].scheme key_scheme = key_scheme or role.scheme public_key, serial_num = _setup_yubikey( yubikeys, role.name, - key_id, + key_name, yubikey_keys, key_scheme, certs_dir, key_size, + require_single_yk=True, ) loaded_keys_num += 1 signer = YkSigner( - public_key, partial(yk.yk_secrets_handler, serial_num=serial_num) + public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num) ) signers.append(signer) + keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name - if loaded_keys_num < role.threshold: + if loaded_keys_num < role.number: #role.threshold: print(f"Threshold of role {role.name} is {role.threshold}") while loaded_keys_num < role.threshold: loaded_keys = [] - for key_id, public_key in yk_with_public_key.items(): + for key_name, public_key in yk_with_public_key.items(): if ( - key_id in users_yubikeys_details - and not users_yubikeys_details[key_id].present + key_name in users_yubikeys_details + and not users_yubikeys_details[key_name].present ): continue serial_num = _load_and_verify_yubikey( - yubikeys, role.name, key_id, public_key + yubikeys, role.name, key_name, public_key ) if serial_num: loaded_keys_num += 1 - loaded_keys.append(key_id) + loaded_keys.append(key_name) signer = YkSigner( public_key, partial(yk.yk_secrets_handler, serial_num=serial_num), @@ -425,10 +436,10 @@ def _setup_yubikey_roles_keys( f"Threshold of signing keys of role {role.name} not reached. Continue?" ): raise SigningError("Not enough signing keys") - for key_id in loaded_keys: - yk_with_public_key.pop(key_id) + for key_name in loaded_keys: + yk_with_public_key.pop(key_name) - return yubikey_keys, signers + return yubikey_keys, signers, keyid_name_mapping def _setup_keystore_key( @@ -439,7 +450,7 @@ def _setup_keystore_key( length: int, password: Optional[str], skip_prompt: Optional[bool], -) -> CryptoSigner: +) -> Tuple[CryptoSigner, str]: # if keystore exists, load the keys generate_new_keys = keystore is None signer = None @@ -501,7 +512,7 @@ def _invalid_key_message(key_name, keystore): print(f"{role_name} key:\n\n{private_pem}\n\n") signer = load_signer_from_pem(private_pem) - return signer + return signer, _get_legacy_keyid(signer.public_key) def _setup_yubikey( @@ -512,6 +523,7 @@ def _setup_yubikey( scheme: Optional[str] = DEFAULT_RSA_SIGNATURE_SCHEME, certs_dir: Optional[Union[Path, str]] = None, key_size: int = 2048, + require_single_yk=False, ) -> Tuple[Dict, str]: print(f"Registering keys for {key_name}") while True: @@ -523,7 +535,7 @@ def _setup_yubikey( if click.confirm("Cancel?"): raise YubikeyError("Yubikey setup canceled") continue - key, serial_num = yk.yubikey_prompt( + yubikeys = yk.yubikey_prompt( key_name, role_name, taf_repo=None, @@ -532,7 +544,9 @@ def _setup_yubikey( loaded_yubikeys=yubikeys, pin_confirm=True, pin_repeat=True, + require_single_yubikey=True, ) + key, serial_num = yubikeys[0] if use_existing and key in loaded_keys: print("Key already loaded. Please insert a different YubiKey") else: @@ -540,7 +554,7 @@ def _setup_yubikey( key = yk.setup_new_yubikey(serial_num, scheme, key_size=key_size) if certs_dir is not None: - yk.export_yk_certificate(certs_dir, key) + yk.export_yk_certificate(certs_dir, key, serial=serial_num) return key, serial_num diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index ee090f4e..a7331ca3 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -415,6 +415,7 @@ def create( roles_keys_data: RolesKeysData, signers: dict, additional_verification_keys: Optional[dict] = None, + key_name_mappings: Optional[Dict[str, str]] = None ): """Create a new metadata repository on disk. @@ -468,6 +469,7 @@ def create( root.add_key(public_key, role.name) root.roles[role.name].threshold = role.threshold + root.unrecognized_fields["key_names"] = key_name_mappings targets = Targets() target_roles = {"targets": targets} delegations_per_parent: Dict[str, Dict] = defaultdict(dict) @@ -512,7 +514,7 @@ def create( self.close(name, Metadata(signed)) def create_delegated_role( - self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]] + self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]], key_name_mappings: Optional[Dict[str, str]]=None ): existing_roles = self.get_all_targets_roles() existing_roles.extend(MAIN_ROLES) diff --git a/taf/yubikey.py b/taf/yubikey.py index 2d629822..8e7ce533 100644 --- a/taf/yubikey.py +++ b/taf/yubikey.py @@ -244,7 +244,7 @@ def export_piv_pub_key(pub_key_format=serialization.Encoding.PEM, serial=None): @raise_yubikey_err("Cannot export yk certificate.") -def export_yk_certificate(certs_dir, key: SSlibKey): +def export_yk_certificate(certs_dir, key: SSlibKey, serial: str) : if certs_dir is None: certs_dir = Path.home() else: @@ -253,7 +253,7 @@ def export_yk_certificate(certs_dir, key: SSlibKey): cert_path = certs_dir / f"{key.keyid}.cert" print(f"Exporting certificate to {cert_path}") with open(cert_path, "wb") as f: - f.write(export_piv_x509()) + f.write(export_piv_x509(serial=serial)) @raise_yubikey_err("Cannot get public key in TUF format.") @@ -446,7 +446,18 @@ def yubikey_prompt( prompt_message=None, retry_on_failure=True, hide_already_loaded_message=False, + require_single_yubikey=True, ): + if require_single_yubikey: + while True: + serials = get_serial_num() + if len(serials) == 1: + break + else: + prompt_message = f"Please insert only one YubiKey and press ENTER" + getpass(prompt_message) + + # TODO # need to pass in multiple key names def _read_and_check_yubikeys(