From f4687a23a3d11b9b37b5f8d34bd62e2ae5d66a35 Mon Sep 17 00:00:00 2001 From: Albin Antony Date: Thu, 17 Oct 2024 15:42:15 +0530 Subject: [PATCH] Fix L3-iGrant/api#651: Ensure full support of IETF SD-JWT VC --- sdjwt/pex.py | 32 ++++++++++-------- sdjwt/sdjwt.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 15 deletions(-) diff --git a/sdjwt/pex.py b/sdjwt/pex.py index c640b7a..d1a6640 100644 --- a/sdjwt/pex.py +++ b/sdjwt/pex.py @@ -141,7 +141,7 @@ class PresentationDefinition(BaseModel): "type": "object", "additionalProperties": False, "patternProperties": { - "^(jwt|jwt_vc|jwt_vp|vp\+sd-jwt|vc\+sd-jwt|sd-jwt)$": { + "^(jwt|jwt_vc|jwt_vc_json|jwt_vp|vp\+sd-jwt|vc\+sd-jwt|sd-jwt)$": { "type": "object", "additionalProperties": False, "properties": { @@ -229,6 +229,7 @@ class PresentationDefinitionValidationError(Exception): "vp+sd-jwt", "sd-jwt", "mso_mdoc", + "jwt_vc_json", ], }, }, @@ -524,7 +525,7 @@ def find_all_sd_values(data): # Function to extract relevant disclosure values def extract_disclosure_values(input_descriptor, credential, disclosure): fields = input_descriptor["constraints"]["fields"] - sd_values = find_all_sd_values(credential["credentialSubject"]) + sd_values = find_all_sd_values(credential) matching_disclosures = [] for field in fields: @@ -628,13 +629,11 @@ def match_credentials_for_sd_jwt( credential_subject, key_mapping = ( decode_credential_sd_to_credential_subject_with_key_mapping( disclosure_mapping=disclosure_mapping, - credential_subject=credential_decoded.get("vc").get( - "credentialSubject" - ), + credential_subject=credential_decoded ) ) - credential = credential_decoded.get("vc") - credential["credentialSubject"] = credential_subject + credential = credential_decoded + credential = credential_subject credential = json.dumps(credential) # Iterate through fields specified in the constraints @@ -768,10 +767,10 @@ def validate_vp_token( disclosure_mapping = get_all_disclosures_with_sd_from_token(vc_token) credential_subject = create_credential_subject_for_sdjwt( - credential_subject=vc_claims.get("vc").get("credentialSubject"), + credential_subject=vc_claims, disclosure_mapping=disclosure_mapping, ) - vc_claims["vc"]["credentialSubject"] = credential_subject + vc_claims = credential_subject elif vc_claims and format == "jwt_vc": pass @@ -780,10 +779,17 @@ def validate_vp_token( ) for input_descriptor in input_descriptors: if input_descriptor.get("id") == id: - matches = match_credentials( - json.dumps(input_descriptor), - credentials=[json.dumps(vc_claims["vc"])], - ) + limit_disclosure = input_descriptor.get("constraints").get("limit_disclosure",None) + if limit_disclosure and limit_disclosure == "required": + matches = match_credentials( + json.dumps(input_descriptor), + credentials=[json.dumps(vc_claims)], + ) + else: + matches = match_credentials( + json.dumps(input_descriptor), + credentials=[json.dumps(vc_claims["vc"])], + ) if not matches or not matches[0]: return False else: diff --git a/sdjwt/sdjwt.py b/sdjwt/sdjwt.py index 40906e0..fd4eea5 100644 --- a/sdjwt/sdjwt.py +++ b/sdjwt/sdjwt.py @@ -740,6 +740,90 @@ def _create_disclosure_mapping_from_credential_definition(data): def create_disclosure_mapping_from_credential_definition(credential_definition): data = credential_definition["properties"] - disclosure_mapping = {} - disclosure_mapping["credentialSubject"] = _create_disclosure_mapping_from_credential_definition(data) + disclosure_mapping = _create_disclosure_mapping_from_credential_definition(data) return disclosure_mapping + + +def create_vc_sd_jwt( + jti: str, + iss: str, + sub: str, + kid: str, + key: jwk.JWK, + vct: str, + credential_subject: dict, + disclosure_mapping: typing.Optional[dict] = None, + expiry_in_seconds: typing.Optional[int] = None, +) -> str: + if not expiry_in_seconds: + expiry_in_seconds = 2592000 + issuance_epoch, issuance_8601 = ( + get_current_datetime_in_epoch_seconds_and_iso8601_format() + ) + expiration_epoch, expiration_8601 = ( + get_current_datetime_in_epoch_seconds_and_iso8601_format(expiry_in_seconds) + ) + _credentialSubject = {**credential_subject} + if disclosure_mapping: + disclosures = [] + + def calculate_sd(name, value): + _sd = [] + disclosure_base64 = create_disclosure_base64( + create_random_salt(32), key=name, value=value + ) + sd = create_sd_from_disclosure_base64(disclosure_base64) + disclosures.append(disclosure_base64) + _sd.append(sd) + return _sd + + def update_value(obj, path): + # Construct json path dot notation + dot_notation_path = ".".join(path) + + # Find matches for the json path + jp = parse(dot_notation_path) + matches = jp.find(obj) + + # Iterate through the matches and calculated sd + for match in matches: + sd = calculate_sd(str(match.path), match.value) + if isinstance(match.context.value, dict): + if not match.context.value.get("_sd"): + match.context.value.setdefault("_sd", sd) + del match.context.value[str(match.path)] + else: + match.context.value["_sd"].extend(sd) + del match.context.value[str(match.path)] + + def iterate_mapping(obj, path): + for key, value in obj.items(): + if isinstance(value, dict): + new_path = path + [f"'{key}'"] + # Check if limitDisclosure is present or not + if "limitDisclosure" in value and value["limitDisclosure"]: + update_value(_credentialSubject, new_path) + iterate_mapping(value, new_path) + + # Iterate through disclosure mapping + # and add sd to the corresponding field in the + # credential subject + iterate_mapping(disclosure_mapping, []) + + + jwt_credential = create_jwt( + jti=jti, + sub=sub, + iss=iss, + kid=kid, + key=key, + iat=issuance_epoch, + exp=expiration_epoch, + vct=vct, + **_credentialSubject, + ) + sd_disclosures = "" + if disclosure_mapping: + sd_disclosures = "~" + "~".join(disclosures) + + return jwt_credential + sd_disclosures