Skip to content

Commit

Permalink
refact: resolve flake issues
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Jan 11, 2025
1 parent 540f0c9 commit 7eb7785
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 130 deletions.
103 changes: 59 additions & 44 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def _sort_roles(roles):
verification_keys: Dict = {}
keys_name_mappings: Dict = {}


for role in keystore_roles:
if role.name in existing_roles:
continue
Expand Down Expand Up @@ -193,6 +192,40 @@ def _load_signer_from_keystore(
return None


def _load_and_append_yubikeys(
taf_repo,
key_name,
role,
retry_on_failure,
hide_already_loaded_message,
loaded_yubikeys,
signers_yubikeys,
):
inserted_yubikeys = yk.yubikey_prompt(
key_name,
role,
taf_repo,
loaded_yubikeys=loaded_yubikeys,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
)

loaded_keyids = [signer.public_key.keyid for signer in signers_yubikeys]
num_of_loaded_keys = 0
for public_key, serial_num in inserted_yubikeys:
if public_key is not None and public_key.keyid not in loaded_keyids:
signer = YkSigner(
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
)
signers_yubikeys.append(signer)
loaded_keyids.append(public_key.keyid)
num_of_loaded_keys += 1
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")
return num_of_loaded_keys


@log_on_start(INFO, "Loading signing keys of '{role:s}'", logger=taf_logger)
def load_signers(
taf_repo: TUFRepository,
Expand All @@ -211,9 +244,8 @@ def load_signers(
signing_keys_num = len(taf_repo.get_role_keys(role))
all_loaded = False
num_of_signatures = 0
signers_keystore = []
signers_yubikeys = []
yubikeys = []
signers_keystore: List = []
signers_yubikeys: List = []

# first try to sign using yubikey
# if that is not possible, try to load key from a keystore file
Expand All @@ -229,32 +261,6 @@ def load_signers(

keystore_path = Path(keystore).expanduser().resolve() if keystore else None

def _load_and_append_yubikeys(
key_name, role, retry_on_failure, hide_already_loaded_message
):
inserted_yubikeys = yk.yubikey_prompt(
key_name,
role,
taf_repo,
loaded_yubikeys=loaded_yubikeys,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
)

num_of_loaded_keys = 0
for public_key, serial_num in inserted_yubikeys:
if public_key is not None and public_key.keyid not in yubikeys:
signer = YkSigner(
public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num)
)
signers_yubikeys.append(signer)
yubikeys.append(public_key.keyid)
num_of_loaded_keys += 1
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")
return num_of_loaded_keys



keystore_files = []
if keystore is not None:
keystore_files = get_keystore_keys_of_role(keystore, role)
Expand Down Expand Up @@ -285,7 +291,9 @@ def _load_and_append_yubikeys(

# try to load from the inserted YubiKey, without asking the user to insert it
key_name = get_key_name(role, num_of_signatures, signing_keys_num)
num_of_loaded_keys = _load_and_append_yubikeys(key_name, role, False, True)
num_of_loaded_keys = _load_and_append_yubikeys(
taf_repo, key_name, role, False, True, loaded_yubikeys, signers_yubikeys
)

if num_of_loaded_keys:
num_of_signatures += num_of_loaded_keys
Expand All @@ -297,7 +305,9 @@ def _load_and_append_yubikeys(
prompt_for_yubikey = False

if use_yubikey_for_signing_confirmed:
if _load_and_append_yubikeys(key_name, role, True, False):
if _load_and_append_yubikeys(
taf_repo, key_name, role, True, False, loaded_yubikeys, signers_yubikeys
):
num_of_signatures += 1
continue

Expand Down Expand Up @@ -403,12 +413,14 @@ def _setup_yubikey_roles_keys(
)
loaded_keys_num += 1
signer = YkSigner(
public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num)
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
)
signers.append(signer)
keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name

if loaded_keys_num < role.number: #role.threshold:
if loaded_keys_num < role.number: # role.threshold:
print(f"Threshold of role {role.name} is {role.threshold}")
while loaded_keys_num < role.threshold:
loaded_keys = []
Expand Down Expand Up @@ -512,7 +524,9 @@ def _invalid_key_message(key_name, keystore):
print(f"{role_name} key:\n\n{private_pem.decode()}\n\n")
signer = load_signer_from_pem(private_pem)

return signer, _get_legacy_keyid(signer.public_key)
if signer is not None:
return signer, _get_legacy_keyid(signer.public_key)
return None, None


def _setup_yubikey(
Expand Down Expand Up @@ -546,16 +560,17 @@ def _setup_yubikey(
pin_repeat=True,
require_single_yubikey=True,
)
key, serial_num = yubikeys[0]
if use_existing and key in loaded_keys:
print("Key already loaded. Please insert a different YubiKey")
else:
if not use_existing:
key = yk.setup_new_yubikey(serial_num, scheme, key_size=key_size)
if yubikeys is not None:
key, serial_num = yubikeys[0]
if use_existing and key in loaded_keys:
print("Key already loaded. Please insert a different YubiKey")
else:
if not use_existing:
key = yk.setup_new_yubikey(serial_num, scheme, key_size=key_size)

if certs_dir is not None:
yk.export_yk_certificate(certs_dir, key, serial=serial_num)
return key, serial_num
if certs_dir is not None:
yk.export_yk_certificate(certs_dir, key, serial=serial_num)
return key, serial_num


def _load_and_verify_yubikey(
Expand Down
1 change: 0 additions & 1 deletion taf/tools/yubikey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,3 @@ def attach_to_group(group):
group.add_command(list_key_command(), name='list-key')
group.add_command(setup_signing_key_command(), name='setup-signing-key')
group.add_command(setup_test_key_command(), name='setup-test-key')

4 changes: 3 additions & 1 deletion taf/tuf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ class YkSigner(Signer):

_SECRET_PROMPT = "pin"

def __init__(self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler):
def __init__(
self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler
):

self._public_key = public_key
self._pin_handler = pin_handler
Expand Down
8 changes: 5 additions & 3 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def create(
roles_keys_data: RolesKeysData,
signers: dict,
additional_verification_keys: Optional[dict] = None,
key_name_mappings: Optional[Dict[str, str]] = None
key_name_mappings: Optional[Dict[str, str]] = None,
) -> None:
"""Create a new metadata repository on disk.
Expand Down Expand Up @@ -602,9 +602,11 @@ def create(
signed.version = 0 # `close` will bump to initial valid verison 1
self.close(name, Metadata(signed))


def create_delegated_roles(
self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]], key_name_mappings: Optional[Dict[str, str]]=None
self,
roles_data: List[TargetsRole],
signers: Dict[str, List[CryptoSigner]],
key_name_mappings: Optional[Dict[str, str]] = None,
) -> Tuple[List, List]:
"""
Create a new delegated roles, signes them using the provided signers and
Expand Down
Loading

0 comments on commit 7eb7785

Please sign in to comment.