Skip to content

Commit

Permalink
Merge branch 'main' into cwa/close-64-rejig-pydantic-models
Browse files Browse the repository at this point in the history
  • Loading branch information
CasperWA authored Feb 14, 2024
2 parents c431f73 + 3a980c5 commit 4f36b15
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 62 deletions.
10 changes: 8 additions & 2 deletions entities_service/cli/_utils/generics.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def initialize_oauth2(
openid_config_url = OPENID_CONFIG_URL

try:
AnyHttpUrl(openid_config_url)
openid_url: AnyHttpUrl = AnyHttpUrl(openid_config_url)
except ValidationError as exc:
raise ValueError(
f"Invalid OpenID configuration URL: {openid_config_url}."
Expand All @@ -114,7 +114,7 @@ def initialize_oauth2(

try:
with httpx.Client() as client:
response = client.get(openid_config_url).json()
response: dict[str, Any] = client.get(openid_config_url).json()
except (httpx.HTTPError, JSONDecodeError) as exc:
raise ValueError(
f"Could not retrieve OpenID configuration from {openid_config_url}."
Expand All @@ -129,6 +129,12 @@ def initialize_oauth2(
" Please check that the URL is correct."
) from exc

if openid_config.code_challenge_methods_supported is None:
# If omitted, the authorization server does not support PKCE.
raise ValueError(
f"{openid_url.unicode_host} does not support the PKCE Auth flow."
)

response_type = "code"
if response_type not in openid_config.response_types_supported:
raise ValueError(
Expand Down
17 changes: 15 additions & 2 deletions entities_service/cli/_utils/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
) from exc

from entities_service import __version__
from entities_service.cli._utils.generics import print
from entities_service.cli._utils.generics import CACHE_DIRECTORY, ERROR_CONSOLE, print
from entities_service.service.config import CONFIG

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -69,6 +69,19 @@ def global_options(
rich_help_panel="Global options",
),
) -> None:
"""Global options for the CLI."""
"""Global options for the CLI.
This function is also used to run initial setup for the CLI.
"""
if dotenv_path:
CONTEXT["dotenv_path"] = dotenv_path

# Initialize the cache directory
try:
CACHE_DIRECTORY.mkdir(parents=True, exist_ok=True)
except PermissionError as exc:
ERROR_CONSOLE.print(
f"[bold red]Error[/bold red]: {CACHE_DIRECTORY} is not writable. "
"Please check your permissions."
)
raise typer.Exit(1) from exc
254 changes: 199 additions & 55 deletions entities_service/models/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,220 @@
from typing import Annotated

from pydantic import (
AnyHttpUrl,
BaseModel,
Field,
)
from pydantic.networks import Url, UrlConstraints

AnyHttpsUrl = Annotated[Url, UrlConstraints(allowed_schemes=["https"])]


class OpenIDConfiguration(BaseModel):
"""OpenID configuration for Code flow with PKCE."""
"""OpenID configuration for Code flow with PKCE.
This is defined in the OpenID Connect Discovery specification.
Reference: https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
As well as the OAuth 2.0 Authorization Server Metadata [RFC8414].
Reference: https://www.rfc-editor.org/rfc/rfc8414.html#section-2
issuer: AnyHttpUrl
authorization_endpoint: AnyHttpUrl
token_endpoint: AnyHttpUrl
userinfo_endpoint: AnyHttpUrl | None = None
jwks_uri: AnyHttpUrl
registration_endpoint: AnyHttpUrl | None = None
scopes_supported: list[str] | None = None
response_types_supported: list[str]
response_modes_supported: list[str] | None = None
grant_types_supported: list[str] | None = None
acr_values_supported: list[str] | None = None
subject_types_supported: list[str]
id_token_signing_alg_values_supported: list[str]
id_token_encryption_alg_values_supported: list[str] | None = None
id_token_encryption_enc_values_supported: list[str] | None = None
userinfo_signing_alg_values_supported: list[str] | None = None
userinfo_encryption_alg_values_supported: list[str] | None = None
userinfo_encryption_enc_values_supported: list[str] | None = None
request_object_signing_alg_values_supported: list[str] | None = None
request_object_encryption_alg_values_supported: list[str] | None = None
request_object_encryption_enc_values_supported: list[str] | None = None
token_endpoint_auth_methods_supported: list[str] | None = None
token_endpoint_auth_signing_alg_values_supported: list[str] | None = None
display_values_supported: list[str] | None = None
claim_types_supported: list[str] | None = None
claims_supported: list[str] | None = None
service_documentation: AnyHttpUrl | None = None
claims_locales_supported: list[str] | None = None
ui_locals_supported: list[str] | None = None
claims_parameter_supported: bool = False
request_parameter_supported: bool = False
request_uri_parameter_supported: bool = True
require_request_uri_registration: bool = False
op_policy_uri: AnyHttpUrl | None = None
op_tos_uri: AnyHttpUrl | None = None
code_challenge_methods_supported: list[str]
Note, the model only includes the fields required by the OpenID specification
as well as those necessary for the Code flow with PKCE.
"""

# Extras
revocation_endpoint: AnyHttpUrl | None = None
introspection_endpoint: AnyHttpUrl | None = None
issuer: Annotated[
AnyHttpsUrl,
Field(
description=(
"URL using the `https` scheme with no query or fragment components "
"that the OP asserts as its Issuer Identifier. If Issuer discovery is "
"supported (see [Section 2](https://openid.net/specs/openid-connect"
"-discovery-1_0.html#IssuerDiscovery)), this value MUST be identical "
"to the issuer value returned by WebFinger. This also MUST be "
"identical to the `iss` Claim value in ID Tokens issued from this "
"Issuer."
),
),
]
authorization_endpoint: Annotated[
AnyHttpsUrl,
Field(
description=(
"URL of the OP's OAuth 2.0 Authorization Endpoint [[OpenID.Core]]("
"https://openid.net/specs/openid-connect-discovery-1_0.html#OpenID.Core"
"). This URL MUST use the `https` scheme and MAY contain port, path, "
"and query parameter components."
),
),
]
token_endpoint: Annotated[
AnyHttpsUrl,
Field(
description=(
"URL of the OP's OAuth 2.0 Token Endpoint [[OpenID.Core]]("
"https://openid.net/specs/openid-connect-discovery-1_0.html#OpenID.Core"
"). This URL MUST use the `https` scheme and MAY contain port, path, "
"and query parameter components."
),
),
]
userinfo_endpoint: Annotated[
AnyHttpsUrl | None,
Field(
description=(
"URL of the OP's UserInfo Endpoint [[OpenID.Core]]("
"https://openid.net/specs/openid-connect-discovery-1_0.html#OpenID.Core"
"). This URL MUST use the `https` scheme and MAY contain port, path, "
"and query parameter components."
),
),
] = None
jwks_uri: Annotated[
AnyHttpsUrl,
Field(
description=(
"URL of the OP's JWK Set [[JWK]]("
"https://openid.net/specs/openid-connect-discovery-1_0.html#JWK) "
"document, which MUST use the `https` scheme. This contains the "
"signing key(s) the RP uses to validate signatures from the OP. The "
"JWK Set MAY also contain the Server's encryption key(s), which are "
"used by RPs to encrypt requests to the Server. When both signing and "
"encryption keys are made available, a `use` (public key use) "
"parameter value is REQUIRED for all keys in the referenced JWK Set to "
"indicate each key's intended usage. Although some algorithms allow "
"the same key to be used for both signatures and encryption, doing so "
"is NOT RECOMMENDED, as it is less secure. The JWK `x5c` parameter MAY "
"be used to provide X.509 representations of keys provided. When used, "
"the bare key values MUST still be present and MUST match those in the "
"certificate. The JWK Set MUST NOT contain private or symmetric key "
"values."
),
),
]
response_types_supported: Annotated[
list[str],
Field(
description=(
"JSON array containing a list of the OAuth 2.0 `response_type` values "
"that this OP supports. Dynamic OpenID Providers MUST support the "
"`code`, `id_token`, and the `id_token` `token` Response Type values."
),
),
]
subject_types_supported: Annotated[
list[str],
Field(
description=(
"JSON array containing a list of the Subject Identifier types that "
"this OP supports. Valid types include `pairwise` and `public`."
),
),
]
id_token_signing_alg_values_supported: Annotated[
list[str],
Field(
description=(
"JSON array containing a list of the JWS signing algorithms (`alg` "
"values) supported by the OP for the ID Token to encode the Claims in "
"a JWT [[JWT]](https://openid.net/specs/openid-connect-discovery-1_0"
".html#JWT). The algorithm `RS256` MUST be included. The value `none` "
"MAY be supported but MUST NOT be used unless the Response Type used "
"returns no ID Token from the Authorization Endpoint (such as when "
"using the Authorization Code Flow)."
),
),
]
code_challenge_methods_supported: Annotated[
list[str] | None,
Field(
description=(
"JSON array containing a list of Proof Key for Code Exchange (PKCE) "
"[[RFC7636](https://www.rfc-editor.org/rfc/rfc7636)] code challenge "
"methods supported by this authorization server. Code challenge "
"method values are used in the 'code_challenge_method' parameter "
"defined in [Section 4.3 of [RFC7636]](https://www.rfc-editor.org/"
"rfc/rfc7636#section-4.3). The valid code challenge method values are "
"those registered in the IANA 'PKCE Code Challenge Methods' registry "
"[[IANA.OAuth.Parameters](https://www.rfc-editor.org/rfc/rfc8414.html"
"#ref-IANA.OAuth.Parameters)]. If omitted, the authorization server "
"does not support PKCE."
),
),
] = None


class GitLabUserInfo(BaseModel):
"""OpenID userinfo response from GitLab."""
"""OpenID userinfo response from GitLab.
sub: str
name: str
nickname: str
preferred_username: str
email: str | None = None
email_verified: bool | None = None
website: AnyHttpUrl | str
profile: AnyHttpUrl
picture: AnyHttpUrl
groups: list[str]
This is defined in the OpenID Connect specification.
Reference: https://openid.net/specs/openid-connect-core-1_0.html#UserInfo
Claims not defined in the OpenID Connect specification are prefixed with
`https://gitlab.org/claims/`.
As well as the `groups` claim, which is a list of groups the user is a member of.
"""

sub: Annotated[
str, Field(description="Subject - Identifier for the End-User at the Issuer.")
]
name: Annotated[
str | None,
Field(
description=(
"End-User's full name in displayable form including all name parts, "
"possibly including titles and suffixes, ordered according to the "
"End-User's locale and preferences."
),
),
] = None
preferred_username: Annotated[
str | None,
Field(
description=(
"Shorthand name by which the End-User wishes to be referred to at the "
"RP, such as `janedoe` or `j.doe`. This value MAY be any valid JSON "
"string including special characters such as `@`, `/`, or whitespace. "
"The RP MUST NOT rely upon this value being unique, as discussed in "
"[Section 5.7](https://openid.net/specs/openid-connect-core-1_0.html"
"#ClaimStability)."
),
),
] = None
groups: Annotated[
list[str],
Field(
description=(
"Paths for the groups the user is a member of, either directly or "
"through an ancestor group."
),
),
] = []
groups_owner: Annotated[
list[str], Field(alias="https://gitlab.org/claims/groups/owner")
list[str],
Field(
alias="https://gitlab.org/claims/groups/owner",
description=(
"Names of the groups the user is a direct member of with Owner role."
),
),
] = []
groups_maintainer: Annotated[
list[str], Field(alias="https://gitlab.org/claims/groups/maintainer")
list[str],
Field(
alias="https://gitlab.org/claims/groups/maintainer",
description=(
"Names of the groups the user is a direct member of with Maintainer "
"role."
),
),
] = []
groups_developer: Annotated[
list[str], Field(alias="https://gitlab.org/claims/groups/developer")
list[str],
Field(
alias="https://gitlab.org/claims/groups/developer",
description=(
"Names of the groups the user is a direct member of with Developer "
"role."
),
),
] = []
49 changes: 49 additions & 0 deletions tests/cli/_utils/test_global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,52 @@ def test_dotenv_path(
assert not result.stderr

assert CONTEXT["dotenv_path"] == dotenv_path


def test_cache_dir_creation(cli: CliRunner, tmp_cache_dir: Path) -> None:
"""Ensure the cache directiory is created if it does not already exist.
Note, the callback to `global_settings` is not done if invoking the CLI without any
arguments. It will instead go straight to outputting the help.
However, when calling any command, `global_settings` is called.
Hence, `upload` is called here, which will simply return the help for that command.
"""
from entities_service.cli.main import APP

# tmp_cache_dir should not yet exist
assert not tmp_cache_dir.exists()

result = cli.invoke(APP, "upload")
assert result.exit_code == 0, CLI_RESULT_FAIL_MESSAGE.format(
stdout=result.stdout, stderr=result.stderr
)
assert not result.stderr

# tmp_cache_dir should now exist
assert tmp_cache_dir.exists()


def test_cache_dir_permissionerror(cli: CliRunner, tmp_path: Path) -> None:
"""Ensure a PermissionError is raised and handled if the cache dir cannot be
created.
Note, the callback to `global_settings` is not done if invoking the CLI without any
arguments. It will instead go straight to outputting the help.
However, when calling any command, `global_settings` is called.
Hence, `upload` is called here, which will simply return the help for that command.
"""
from entities_service.cli.main import APP

org_mode = tmp_path.stat().st_mode
tmp_path.chmod(0x555)

result = cli.invoke(APP, "upload")
assert result.exit_code != 0, CLI_RESULT_FAIL_MESSAGE.format(
stdout=result.stdout, stderr=result.stderr
)
assert not result.stdout

assert "Error: " in result.stderr
assert "is not writable. Please check your permissions." in result.stderr

tmp_path.chmod(org_mode)
Loading

0 comments on commit 4f36b15

Please sign in to comment.