Skip to content

Commit

Permalink
refact: resolve flake issues
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Jan 10, 2025
1 parent 540f0c9 commit 64e2e79
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 117 deletions.
74 changes: 42 additions & 32 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def _sort_roles(roles):
verification_keys: Dict = {}
keys_name_mappings: Dict = {}


for role in keystore_roles:
if role.name in existing_roles:
continue
Expand Down Expand Up @@ -193,6 +192,38 @@ def _load_signer_from_keystore(
return None


def _load_and_append_yubikeys(
taf_repo,
key_name,
role,
retry_on_failure,
hide_already_loaded_message,
loaded_yubikeys,
signers_yubikeys,
):
inserted_yubikeys = yk.yubikey_prompt(
key_name,
role,
taf_repo,
loaded_yubikeys=loaded_yubikeys,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
)

num_of_loaded_keys = 0
for public_key, serial_num in inserted_yubikeys:
if public_key is not None and public_key.keyid not in signer.public_key.keyid:
signer = YkSigner(
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
)
signers_yubikeys.append(signer)
num_of_loaded_keys += 1
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")
return num_of_loaded_keys


@log_on_start(INFO, "Loading signing keys of '{role:s}'", logger=taf_logger)
def load_signers(
taf_repo: TUFRepository,
Expand All @@ -213,7 +244,6 @@ def load_signers(
num_of_signatures = 0
signers_keystore = []
signers_yubikeys = []
yubikeys = []

# first try to sign using yubikey
# if that is not possible, try to load key from a keystore file
Expand All @@ -229,32 +259,6 @@ def load_signers(

keystore_path = Path(keystore).expanduser().resolve() if keystore else None

def _load_and_append_yubikeys(
key_name, role, retry_on_failure, hide_already_loaded_message
):
inserted_yubikeys = yk.yubikey_prompt(
key_name,
role,
taf_repo,
loaded_yubikeys=loaded_yubikeys,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
)

num_of_loaded_keys = 0
for public_key, serial_num in inserted_yubikeys:
if public_key is not None and public_key.keyid not in yubikeys:
signer = YkSigner(
public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num)
)
signers_yubikeys.append(signer)
yubikeys.append(public_key.keyid)
num_of_loaded_keys += 1
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")
return num_of_loaded_keys



keystore_files = []
if keystore is not None:
keystore_files = get_keystore_keys_of_role(keystore, role)
Expand Down Expand Up @@ -285,7 +289,9 @@ def _load_and_append_yubikeys(

# try to load from the inserted YubiKey, without asking the user to insert it
key_name = get_key_name(role, num_of_signatures, signing_keys_num)
num_of_loaded_keys = _load_and_append_yubikeys(key_name, role, False, True)
num_of_loaded_keys = _load_and_append_yubikeys(
key_name, role, False, True, loaded_yubikeys, signers_yubikeys
)

if num_of_loaded_keys:
num_of_signatures += num_of_loaded_keys
Expand All @@ -297,7 +303,9 @@ def _load_and_append_yubikeys(
prompt_for_yubikey = False

if use_yubikey_for_signing_confirmed:
if _load_and_append_yubikeys(key_name, role, True, False):
if _load_and_append_yubikeys(
key_name, role, True, False, loaded_yubikeys, signers_yubikeys
):
num_of_signatures += 1
continue

Expand Down Expand Up @@ -403,12 +411,14 @@ def _setup_yubikey_roles_keys(
)
loaded_keys_num += 1
signer = YkSigner(
public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num)
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
)
signers.append(signer)
keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name

if loaded_keys_num < role.number: #role.threshold:
if loaded_keys_num < role.number: # role.threshold:
print(f"Threshold of role {role.name} is {role.threshold}")
while loaded_keys_num < role.threshold:
loaded_keys = []
Expand Down
1 change: 0 additions & 1 deletion taf/tools/yubikey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,3 @@ def attach_to_group(group):
group.add_command(list_key_command(), name='list-key')
group.add_command(setup_signing_key_command(), name='setup-signing-key')
group.add_command(setup_test_key_command(), name='setup-test-key')

4 changes: 3 additions & 1 deletion taf/tuf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ class YkSigner(Signer):

_SECRET_PROMPT = "pin"

def __init__(self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler):
def __init__(
self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler
):

self._public_key = public_key
self._pin_handler = pin_handler
Expand Down
8 changes: 5 additions & 3 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def create(
roles_keys_data: RolesKeysData,
signers: dict,
additional_verification_keys: Optional[dict] = None,
key_name_mappings: Optional[Dict[str, str]] = None
key_name_mappings: Optional[Dict[str, str]] = None,
) -> None:
"""Create a new metadata repository on disk.
Expand Down Expand Up @@ -602,9 +602,11 @@ def create(
signed.version = 0 # `close` will bump to initial valid verison 1
self.close(name, Metadata(signed))


def create_delegated_roles(
self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]], key_name_mappings: Optional[Dict[str, str]]=None
self,
roles_data: List[TargetsRole],
signers: Dict[str, List[CryptoSigner]],
key_name_mappings: Optional[Dict[str, str]] = None,
) -> Tuple[List, List]:
"""
Create a new delegated roles, signes them using the provided signers and
Expand Down
167 changes: 87 additions & 80 deletions taf/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@ def _yk_piv_ctrl(serial=None):
try:
session = PivSession(connection)
sessions.append((session, info.serial))
devices_info.append((connection, session)) # Store to manage cleanup
devices_info.append(
(connection, session)
) # Store to manage cleanup
if serial is not None:
break
except Exception as e:
connection.close() # Ensure we close connection on error
raise e
if serial is not None:
session, serial = sessions[0]
session, serial = sessions[0]
yield session, serial
else:
yield sessions
Expand All @@ -137,7 +139,6 @@ def _yk_piv_ctrl(serial=None):
connection.close()



def is_inserted():
"""Checks if YubiKey is inserted.
Expand Down Expand Up @@ -244,7 +245,7 @@ def export_piv_pub_key(pub_key_format=serialization.Encoding.PEM, serial=None):


@raise_yubikey_err("Cannot export yk certificate.")
def export_yk_certificate(certs_dir, key: SSlibKey, serial: str) :
def export_yk_certificate(certs_dir, key: SSlibKey, serial: str):
if certs_dir is None:
certs_dir = Path.home()
else:
Expand Down Expand Up @@ -292,6 +293,87 @@ def list_connected_yubikeys():
print(f" Form Factor: {info.form_factor}")


# TODO
# need to pass in multiple key names
def _read_and_check_yubikeys(
key_name,
role,
taf_repo,
registering_new_key,
creating_new_key,
loaded_yubikeys,
pin_confirm,
pin_repeat,
prompt_message,
retrying,
):

if retrying:
if prompt_message is None:
prompt_message = f"Please insert {key_name} YubiKey and press ENTER"
getpass(prompt_message)
# make sure that YubiKey is inserted
try:
serials = get_serial_num()
except Exception:
taf_logger.log("NOTICE", "No YubiKeys inserted")
return [False, None, None]

# check if this key is already loaded as the provided role's key (we can use the same key
# to sign different metadata)
yubikeys = []
already_loaded_keys = []
invalid_keys = []
for serial_num in serials:
if (
loaded_yubikeys is not None
and serial_num in loaded_yubikeys
and role in loaded_yubikeys[serial_num]
):
already_loaded_keys.append(serial_num)
else:
# read the public key, unless a new key needs to be generated on the yubikey
public_key = (
get_piv_public_key_tuf(serial=serial_num)
if not creating_new_key
else None
)
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
if not registering_new_key and role is not None and taf_repo is not None:
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
invalid_keys.append(serial_num)
# print(f"The inserted YubiKey is not a valid {role} key")
continue

if get_key_pin(serial_num) is None:
if creating_new_key:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
else:
pin = get_and_validate_pin(
key_name, pin_confirm, pin_repeat, serial_num
)
add_key_pin(serial_num, pin)

if get_key_public_key(serial_num) is None and public_key is not None:
add_key_public_key(serial_num, public_key)

# when reusing the same yubikey, public key will already be in the public keys dictionary
# but the key name still needs to be added to the key id mapping dictionary
add_key_id_mapping(serial_num, key_name)

if role is not None:
if loaded_yubikeys is None:
loaded_yubikeys = {serial_num: [role]}
else:
loaded_yubikeys.setdefault(serial_num, []).append(role)

yubikeys.append((public_key, serial_num))

# TODO error messages
return yubikeys


@raise_yubikey_err("Cannot sign data.")
def sign_piv_rsa_pkcs1v15(data, pin, serial=None):
"""Sign data with key from YubiKey's piv slot.
Expand Down Expand Up @@ -454,84 +536,9 @@ def yubikey_prompt(
if len(serials) == 1:
break
else:
prompt_message = f"Please insert only one YubiKey and press ENTER"
prompt_message = "Please insert only one YubiKey and press ENTER"
getpass(prompt_message)


# TODO
# need to pass in multiple key names
def _read_and_check_yubikeys(
key_name,
role,
taf_repo,
registering_new_key,
creating_new_key,
loaded_yubikeys,
pin_confirm,
pin_repeat,
prompt_message,
retrying,
):

if retrying:
if prompt_message is None:
prompt_message = f"Please insert {key_name} YubiKey and press ENTER"
getpass(prompt_message)
# make sure that YubiKey is inserted
try:
serials = get_serial_num()
except Exception:
taf_logger.log("NOTICE", "No YubiKeys inserted")
return [False, None, None]

# check if this key is already loaded as the provided role's key (we can use the same key
# to sign different metadata)
yubikeys = []
already_loaded_keys = []
invalid_keys = []
for serial_num in serials:
if (
loaded_yubikeys is not None
and serial_num in loaded_yubikeys
and role in loaded_yubikeys[serial_num]
):
already_loaded_keys.append(serial_num)
else:
# read the public key, unless a new key needs to be generated on the yubikey
public_key = get_piv_public_key_tuf(serial=serial_num) if not creating_new_key else None
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
if not registering_new_key and role is not None and taf_repo is not None:
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
invalid_keys.append(serial_num)
# print(f"The inserted YubiKey is not a valid {role} key")
continue

if get_key_pin(serial_num) is None:
if creating_new_key:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
else:
pin = get_and_validate_pin(key_name, pin_confirm, pin_repeat, serial_num)
add_key_pin(serial_num, pin)

if get_key_public_key(serial_num) is None and public_key is not None:
add_key_public_key(serial_num, public_key)

# when reusing the same yubikey, public key will already be in the public keys dictionary
# but the key name still needs to be added to the key id mapping dictionary
add_key_id_mapping(serial_num, key_name)

if role is not None:
if loaded_yubikeys is None:
loaded_yubikeys = {serial_num: [role]}
else:
loaded_yubikeys.setdefault(serial_num, []).append(role)

yubikeys.append((public_key, serial_num))

#TODO error messages
return yubikeys

retry_counter = 0
while True:

Expand Down

0 comments on commit 64e2e79

Please sign in to comment.