Skip to content

Commit

Permalink
feat: initial work on storing names of keys in metadata files
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Dec 17, 2024
1 parent 4bb667c commit 95ce03b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 37 deletions.
4 changes: 2 additions & 2 deletions taf/api/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def create_repository(

roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
auth_repo = AuthenticationRepository(path=path)
signers, verification_keys = load_sorted_keys_of_new_roles(
signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles(
roles=roles_keys_data.roles,
yubikeys_data=roles_keys_data.yubikeys,
keystore=keystore,
Expand All @@ -85,7 +85,7 @@ def create_repository(
return

repository = AuthenticationRepository(path=path)
repository.create(roles_keys_data, signers, verification_keys)
repository.create(roles_keys_data, signers, verification_keys, key_name_mappings)
if commit:
auth_repo.init_repo()
commit_msg = git_commit_message("create-repo")
Expand Down
4 changes: 2 additions & 2 deletions taf/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def add_role(
new_role.threshold = threshold
new_role.yubikey = yubikey

signers, _ = load_sorted_keys_of_new_roles(
signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
roles=new_role,
yubikeys_data=None,
keystore=keystore_path,
Expand Down Expand Up @@ -291,7 +291,7 @@ def add_multiple_roles(
):
all_signers = {}
for role_to_add_data in roles_to_add_data:
signers, _ = load_sorted_keys_of_new_roles(
signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
roles=role_to_add_data,
yubikeys_data=None,
keystore=keystore_path,
Expand Down
74 changes: 44 additions & 30 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from taf.api.utils._conf import find_keystore
from taf.tuf.keys import (
YkSigner,
_get_legacy_keyid,
generate_and_write_rsa_keypair,
generate_rsa_keypair,
get_sslib_key_from_value,
Expand Down Expand Up @@ -140,22 +141,25 @@ def _sort_roles(roles):
keystore_roles, yubikey_roles = _sort_roles(roles)
signers: Dict = {}
verification_keys: Dict = {}
keys_name_mappings: Dict = {}


for role in keystore_roles:
if role.name in existing_roles:
continue
keystore_signers, _, _ = setup_roles_keys(
keystore_signers, _, _, key_name_mapping = setup_roles_keys(
role,
keystore=keystore,
skip_prompt=skip_prompt,
)
for signer in keystore_signers:
signers.setdefault(role.name, []).append(signer)
keys_name_mappings.update(key_name_mapping)

for role in yubikey_roles:
if role.name in existing_roles:
continue
_, yubikey_keys, yubikey_signers = setup_roles_keys(
_, yubikey_keys, yubikey_signers, key_name_mapping = setup_roles_keys(
role,
certs_dir=certs_dir,
yubikeys=yubikeys,
Expand All @@ -164,7 +168,8 @@ def _sort_roles(roles):
)
verification_keys[role.name] = yubikey_keys
signers[role.name] = yubikey_signers
return signers, verification_keys
keys_name_mappings.update(key_name_mapping)
return signers, verification_keys, keys_name_mappings
except KeystoreError:
raise SigningError("Could not load keys of new roles")

Expand Down Expand Up @@ -331,19 +336,21 @@ def setup_roles_keys(
users_yubikeys_details = {}

is_yubikey = bool(yubikey_ids)
keys_name_mappings: Dict = {}

if is_yubikey:
yubikey_keys, yubikey_signers = _setup_yubikey_roles_keys(
yubikey_keys, yubikey_signers, keys_name_mapping = _setup_yubikey_roles_keys(
yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size
)
keys_name_mappings.update(keys_name_mapping)
else:
if keystore is None:
taf_logger.error("No keystore provided and no default keystore found")
raise KeyError("No keystore provided and no default keystore found")
default_params = RoleSetupParams()
for key_num in range(role.number):
key_name = get_key_name(role.name, key_num, role.number)
signer = _setup_keystore_key(
signer, key_id = _setup_keystore_key(
keystore,
role.name,
key_name,
Expand All @@ -353,7 +360,9 @@ def setup_roles_keys(
skip_prompt=skip_prompt,
)
keystore_signers.append(signer)
return keystore_signers, yubikey_keys, yubikey_signers
keys_name_mappings[key_id] = key_name

return keystore_signers, yubikey_keys, yubikey_signers, keys_name_mappings


def _setup_yubikey_roles_keys(
Expand All @@ -363,56 +372,58 @@ def _setup_yubikey_roles_keys(
yk_with_public_key = {}
yubikey_keys = []
signers = []
for key_id in yubikey_ids:

keyid_name_mapping = {}
for key_name in yubikey_ids:
public_key_text = None
if key_id in users_yubikeys_details:
public_key_text = users_yubikeys_details[key_id].public
if key_name in users_yubikeys_details:
public_key_text = users_yubikeys_details[key_name].public
if public_key_text:
scheme = users_yubikeys_details[key_id].scheme
scheme = users_yubikeys_details[key_name].scheme
public_key = get_sslib_key_from_value(public_key_text, scheme)
# Check if the signing key is already loaded
if not yk.get_key_serial_by_id(key_id):
yk_with_public_key[key_id] = public_key
if not yk.get_key_serial_by_id(key_name):
yk_with_public_key[key_name] = public_key
else:
loaded_keys_num += 1
yubikey_keys.append(public_key)
else:
key_scheme = None
if key_id in users_yubikeys_details:
key_scheme = users_yubikeys_details[key_id].scheme
if key_name in users_yubikeys_details:
key_scheme = users_yubikeys_details[key_name].scheme
key_scheme = key_scheme or role.scheme
public_key, serial_num = _setup_yubikey(
yubikeys,
role.name,
key_id,
key_name,
yubikey_keys,
key_scheme,
certs_dir,
key_size,
require_single_yk=True,
)
loaded_keys_num += 1
signer = YkSigner(
public_key, partial(yk.yk_secrets_handler, serial_num=serial_num)
public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num)
)
signers.append(signer)
keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name

if loaded_keys_num < role.threshold:
if loaded_keys_num < role.number: #role.threshold:
print(f"Threshold of role {role.name} is {role.threshold}")
while loaded_keys_num < role.threshold:
loaded_keys = []
for key_id, public_key in yk_with_public_key.items():
for key_name, public_key in yk_with_public_key.items():
if (
key_id in users_yubikeys_details
and not users_yubikeys_details[key_id].present
key_name in users_yubikeys_details
and not users_yubikeys_details[key_name].present
):
continue
serial_num = _load_and_verify_yubikey(
yubikeys, role.name, key_id, public_key
yubikeys, role.name, key_name, public_key
)
if serial_num:
loaded_keys_num += 1
loaded_keys.append(key_id)
loaded_keys.append(key_name)
signer = YkSigner(
public_key,
partial(yk.yk_secrets_handler, serial_num=serial_num),
Expand All @@ -425,10 +436,10 @@ def _setup_yubikey_roles_keys(
f"Threshold of signing keys of role {role.name} not reached. Continue?"
):
raise SigningError("Not enough signing keys")
for key_id in loaded_keys:
yk_with_public_key.pop(key_id)
for key_name in loaded_keys:
yk_with_public_key.pop(key_name)

return yubikey_keys, signers
return yubikey_keys, signers, keyid_name_mapping


def _setup_keystore_key(
Expand All @@ -439,7 +450,7 @@ def _setup_keystore_key(
length: int,
password: Optional[str],
skip_prompt: Optional[bool],
) -> CryptoSigner:
) -> Tuple[CryptoSigner, str]:
# if keystore exists, load the keys
generate_new_keys = keystore is None
signer = None
Expand Down Expand Up @@ -501,7 +512,7 @@ def _invalid_key_message(key_name, keystore):
print(f"{role_name} key:\n\n{private_pem}\n\n")
signer = load_signer_from_pem(private_pem)

return signer
return signer, _get_legacy_keyid(signer.public_key)


def _setup_yubikey(
Expand All @@ -512,6 +523,7 @@ def _setup_yubikey(
scheme: Optional[str] = DEFAULT_RSA_SIGNATURE_SCHEME,
certs_dir: Optional[Union[Path, str]] = None,
key_size: int = 2048,
require_single_yk=False,
) -> Tuple[Dict, str]:
print(f"Registering keys for {key_name}")
while True:
Expand All @@ -523,7 +535,7 @@ def _setup_yubikey(
if click.confirm("Cancel?"):
raise YubikeyError("Yubikey setup canceled")
continue
key, serial_num = yk.yubikey_prompt(
yubikeys = yk.yubikey_prompt(
key_name,
role_name,
taf_repo=None,
Expand All @@ -532,15 +544,17 @@ def _setup_yubikey(
loaded_yubikeys=yubikeys,
pin_confirm=True,
pin_repeat=True,
require_single_yubikey=True,
)
key, serial_num = yubikeys[0]
if use_existing and key in loaded_keys:
print("Key already loaded. Please insert a different YubiKey")
else:
if not use_existing:
key = yk.setup_new_yubikey(serial_num, scheme, key_size=key_size)

if certs_dir is not None:
yk.export_yk_certificate(certs_dir, key)
yk.export_yk_certificate(certs_dir, key, serial=serial_num)
return key, serial_num


Expand Down
4 changes: 3 additions & 1 deletion taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def create(
roles_keys_data: RolesKeysData,
signers: dict,
additional_verification_keys: Optional[dict] = None,
key_name_mappings: Optional[Dict[str, str]] = None
):
"""Create a new metadata repository on disk.
Expand Down Expand Up @@ -468,6 +469,7 @@ def create(
root.add_key(public_key, role.name)
root.roles[role.name].threshold = role.threshold

root.unrecognized_fields["key_names"] = key_name_mappings
targets = Targets()
target_roles = {"targets": targets}
delegations_per_parent: Dict[str, Dict] = defaultdict(dict)
Expand Down Expand Up @@ -512,7 +514,7 @@ def create(
self.close(name, Metadata(signed))

def create_delegated_role(
self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]]
self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]], key_name_mappings: Optional[Dict[str, str]]=None
):
existing_roles = self.get_all_targets_roles()
existing_roles.extend(MAIN_ROLES)
Expand Down
15 changes: 13 additions & 2 deletions taf/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def export_piv_pub_key(pub_key_format=serialization.Encoding.PEM, serial=None):


@raise_yubikey_err("Cannot export yk certificate.")
def export_yk_certificate(certs_dir, key: SSlibKey):
def export_yk_certificate(certs_dir, key: SSlibKey, serial: str) :
if certs_dir is None:
certs_dir = Path.home()
else:
Expand All @@ -253,7 +253,7 @@ def export_yk_certificate(certs_dir, key: SSlibKey):
cert_path = certs_dir / f"{key.keyid}.cert"
print(f"Exporting certificate to {cert_path}")
with open(cert_path, "wb") as f:
f.write(export_piv_x509())
f.write(export_piv_x509(serial=serial))


@raise_yubikey_err("Cannot get public key in TUF format.")
Expand Down Expand Up @@ -446,7 +446,18 @@ def yubikey_prompt(
prompt_message=None,
retry_on_failure=True,
hide_already_loaded_message=False,
require_single_yubikey=True,
):
if require_single_yubikey:
while True:
serials = get_serial_num()
if len(serials) == 1:
break
else:
prompt_message = f"Please insert only one YubiKey and press ENTER"
getpass(prompt_message)


# TODO
# need to pass in multiple key names
def _read_and_check_yubikeys(
Expand Down

0 comments on commit 95ce03b

Please sign in to comment.