Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved testing for mandatoryFilterSet #560

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/schematools/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def name_from_schema(cls, schema: DatasetSchema) -> str:
return name

@classmethod
def create_for_schema(cls, schema: DatasetSchema, path: str | None = None) -> Dataset:
def create_for_schema(
cls, schema: DatasetSchema, path: str | None = None, enable_db: bool = True
) -> Dataset:
"""Create the schema based on the Amsterdam Schema JSON input"""
name = cls.name_from_schema(schema)
if path is None:
Expand All @@ -225,6 +227,7 @@ def create_for_schema(cls, schema: DatasetSchema, path: str | None = None) -> Da
version=schema.version,
is_default_version=schema.is_default_version,
enable_api=cls.has_api_enabled(schema),
enable_db=enable_db,
)
obj._dataset_collection = schema.loader # retain collection on saving
obj.save()
Expand Down
10 changes: 8 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ def brk2_simple_schema(schema_loader) -> DatasetSchema:
return schema_loader.get_dataset_from_file("brk2_simple.json")


@pytest.fixture
def brp_schema(schema_loader) -> DatasetSchema:
"""Fixture for the BRP dataset."""
return schema_loader.get_dataset_from_file("brp.json")


@pytest.fixture
def composite_key_schema(schema_loader) -> ProfileSchema:
return schema_loader.get_dataset_from_file("composite_key.json")
Expand Down Expand Up @@ -391,9 +397,9 @@ def profile_loader(here) -> FileSystemProfileLoader:


@pytest.fixture
def brp_r_profile_schema(profile_loader) -> ProfileSchema:
def brp_rname_profile_schema(profile_loader) -> ProfileSchema:
"""A downloaded profile schema definition"""
return profile_loader.get_profile("BRP_R")
return profile_loader.get_profile("BRP_RNAME")


@pytest.fixture
Expand Down
23 changes: 6 additions & 17 deletions tests/django/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations
"""Extra fixtures for Django-based tests."""

import json
from pathlib import Path
from typing import Any
from __future__ import annotations

import pytest

Expand Down Expand Up @@ -45,24 +43,15 @@ def kadastraleobjecten_dataset(kadastraleobjecten_schema: DatasetSchema) -> Data


@pytest.fixture
def brp_r_profile(brp_r_profile_schema: ProfileSchema) -> Profile:
def brp_rname_profile(brp_rname_profile_schema: ProfileSchema) -> Profile:
"""The persistent database profile object based on a downlaoded schema definition."""
return Profile.create_for_schema(brp_r_profile_schema)


@pytest.fixture
def brp_schema_json(here: Path) -> Any:
"""Fixture for the BRP dataset."""
path = here / "files/brp.json"
return json.loads(path.read_text())
return Profile.create_for_schema(brp_rname_profile_schema)


@pytest.fixture
def brp_dataset(brp_schema_json: dict) -> Dataset:
def brp_dataset(brp_schema: DatasetSchema) -> Dataset:
"""Create a remote dataset."""
return Dataset.objects.create(
name="brp", schema_data=brp_schema_json, enable_db=False, path="brp"
)
return Dataset.create_for_schema(brp_schema, path="brp", enable_db=False)


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions tests/django/test_models_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


@pytest.mark.django_db
def test_profile(brp_r_profile_schema):
def test_profile(brp_rname_profile_schema):
"""Prove that the BRP data is properly stored in the DB"""
brp_r_profile = Profile.create_for_schema(brp_r_profile_schema)
brp_r_profile = Profile.create_for_schema(brp_rname_profile_schema)
assert brp_r_profile.name == "brp_medewerker"
assert brp_r_profile.get_scopes() == {"BRP/R"}
assert brp_r_profile.get_scopes() == {"BRP/RNAME"}

perm = Permission(PermissionLevel.READ)
assert brp_r_profile.schema.datasets["brp"].tables["ingeschrevenpersonen"].permissions == perm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "brp_medewerker",
"scopes": ["BRP/R"],
"scopes": ["BRP/RNAME"],
"datasets": {
"brp": {
"tables": {
Expand Down
169 changes: 111 additions & 58 deletions tests/test_permissions_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,117 @@
from schematools.types import PermissionLevel


def _active_profiles(
user_scopes: UserScopes, dataset_id: str, table_id: str | None = None
) -> set[str]:
"""Shorthand to get active profile names"""
if table_id is not None:
return {
p.dataset.profile.name
for p in user_scopes.get_active_profile_tables(dataset_id, table_id)
}
else:
return {p.profile.name for p in user_scopes.get_active_profile_datasets(dataset_id)}


def test_profile_scopes(profile_verkeer_medewerker_schema, profile_brk_encoded_schema):
"""Prove that profiles are only applied for the correct scopes."""
user_scopes = UserScopes(
query_params={},
request_scopes=["FP/MD"],
all_profiles=[profile_verkeer_medewerker_schema, profile_brk_encoded_schema],
)

assert _active_profiles(user_scopes, "brk") == set()
assert _active_profiles(user_scopes, "verkeer") == {"verkeer_medewerker"}


def test_mandatory_filters(brp_r_profile_schema):
"""Prove that having a scope + mandatory filters actives a profile."""
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/R"],
all_profiles=[brp_r_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == {"brp_medewerker"}

# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/R"],
all_profiles=[brp_r_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()
def _active_profiles(user_scopes: UserScopes, dataset_id: str, table_id: str) -> set[str]:
"""Tell which profiles are active for a table."""
return {
p.dataset.profile.name for p in user_scopes.get_active_profile_tables(dataset_id, table_id)
}


def _active_profiles_by_dataset(user_scopes: UserScopes, dataset_id: str) -> set[str]:
"""Tell which profiles are active for a dataset."""
return {p.profile.name for p in user_scopes.get_active_profile_datasets(dataset_id)}


class TestProfileActivation:
def test_profile_scopes(self, profile_verkeer_medewerker_schema, profile_brk_encoded_schema):
"""Prove that profiles are only applied for the correct scopes."""
user_scopes = UserScopes(
query_params={},
request_scopes=["FP/MD"],
all_profiles=[profile_verkeer_medewerker_schema, profile_brk_encoded_schema],
)

assert _active_profiles_by_dataset(user_scopes, "brk") == set()
assert _active_profiles_by_dataset(user_scopes, "verkeer") == {"verkeer_medewerker"}

def test_mandatory_filter_match(self, brp_rname_profile_schema):
"""Prove that having a scope + mandatory filters actives a profile."""
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == {"brp_medewerker"}

def test_mandatory_filter_operators(self, brp_rname_profile_schema):
"""Prove that operators can't be used to circumvent list-filter limitations."""
user_scopes = UserScopes(
query_params={
"postcode[like]": "*",
"lastname[like]": "*",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()

def test_mandatory_filter_missing_scope(self, brp_rname_profile_schema):
# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/R"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()

def test_mandatory_filter_missing_query(self, brp_rname_profile_schema):
# Also prove the opposite: not getting access
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)

assert _active_profiles(user_scopes, "brp", "ingeschrevenpersonen") == set()


class TestTableAccess:

def test_has_table_fields_access(self, id_auth_schema):
"""Prove that a table with one protected field cannot be accessed with OPENBAAR scope."""

user_scopes = UserScopes(
{},
request_scopes=["OPENBAAR"],
)
table = id_auth_schema.get_table_by_id("base")
assert not user_scopes.has_table_fields_access(table)

def test_table_access_via_mandatory_filters(self, brp_schema, brp_rname_profile_schema):
"""Prove that table access can be granted if the query_params matches a profile."""
table = brp_schema.get_table_by_id("ingeschrevenpersonen")

user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)
assert not user_scopes.has_table_access(table)

# Create the object again, as it caches many results
user_scopes = UserScopes(
query_params={
"postcode": "1234AB",
"lastname": "foobar",
},
request_scopes=["BRP/RNAME"],
all_profiles=[brp_rname_profile_schema],
)
assert user_scopes.has_table_access(table)


class TestFieldAccess:
Expand Down Expand Up @@ -155,16 +218,6 @@ def test_profile_field_inheritance_from_dataset(
assert user_scopes.has_field_access(table.get_field_by_id("identificatie")) == expect
assert user_scopes.has_field_access(table.get_field_by_id("registratiedatum")) == expect

def test_has_table_fields_access(self, id_auth_schema):
"""Prove that a table with one protected field cannot be accessed with OPENBAAR scope."""

user_scopes = UserScopes(
{},
request_scopes=["OPENBAAR"],
)
table = id_auth_schema.get_table_by_id("base")
assert not user_scopes.has_table_fields_access(table)

def test_subfields_have_protection(self, subfield_auth_schema):
"""Prove that the subfields of a protected field are also protected."""

Expand Down
6 changes: 3 additions & 3 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def test_datasetschema_from_file_not_a_dataset(schema_loader) -> None:
schema_loader.get_dataset_from_file("not_a_json_file.txt")


def test_profile_schema(brp_r_profile_schema: ProfileSchema) -> None:
def test_profile_schema(brp_rname_profile_schema: ProfileSchema) -> None:
"""Prove that the profile files are properly read,
and have their fields access the JSON data.
"""
assert brp_r_profile_schema.scopes == {"BRP/R"}
assert brp_rname_profile_schema.scopes == {"BRP/RNAME"}

brp = brp_r_profile_schema.datasets["brp"]
brp = brp_rname_profile_schema.datasets["brp"]
table = brp.tables["ingeschrevenpersonen"]

assert table.permissions.level is PermissionLevel.READ
Expand Down
Loading