From 43ec149a34d84aa0f66283271e156020ebfa94cd Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:10:29 +1100 Subject: [PATCH 01/11] Move types to type module --- superdesk/core/elastic/__init__.py | 6 +- superdesk/core/elastic/base_client.py | 4 +- superdesk/core/elastic/reindex.py | 2 +- superdesk/core/elastic/resources.py | 8 ++- superdesk/core/elastic/utils.py | 6 ++ superdesk/core/mongo/__init__.py | 7 +++ superdesk/core/resources/__init__.py | 9 +-- superdesk/core/types/__init__.py | 10 ++++ superdesk/core/types/elastic.py | 68 ++++++++++++++++++++++ superdesk/core/types/mongo.py | 83 +++++++++++++++++++++++++++ 10 files changed, 189 insertions(+), 14 deletions(-) create mode 100644 superdesk/core/elastic/utils.py create mode 100644 superdesk/core/mongo/__init__.py create mode 100644 superdesk/core/types/elastic.py create mode 100644 superdesk/core/types/mongo.py diff --git a/superdesk/core/elastic/__init__.py b/superdesk/core/elastic/__init__.py index ebc83ef06f..df96b80765 100644 --- a/superdesk/core/elastic/__init__.py +++ b/superdesk/core/elastic/__init__.py @@ -8,10 +8,6 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from .common import ElasticResourceConfig from .resources import ElasticResources -__all__ = [ - "ElasticResources", - "ElasticResourceConfig", -] +__all__ = ["ElasticResources"] diff --git a/superdesk/core/elastic/base_client.py b/superdesk/core/elastic/base_client.py index 9d3c6a8521..d0de426fa6 100644 --- a/superdesk/core/elastic/base_client.py +++ b/superdesk/core/elastic/base_client.py @@ -15,11 +15,9 @@ from eve.io.mongo.parser import parse from superdesk.errors import SuperdeskApiError -from superdesk.core.types import ProjectedFieldArg, SearchRequest, SortParam +from superdesk.core.types import SearchRequest, SortParam, ElasticResourceConfig, ElasticClientConfig from superdesk.core.resources import get_projection_from_request -from .common import ElasticClientConfig, ElasticResourceConfig - class InvalidSearchString(Exception): """Exception thrown when search string has invalid value""" diff --git a/superdesk/core/elastic/reindex.py b/superdesk/core/elastic/reindex.py index 6bb1b7425e..d62f695300 100644 --- a/superdesk/core/elastic/reindex.py +++ b/superdesk/core/elastic/reindex.py @@ -14,7 +14,7 @@ from elasticsearch.exceptions import NotFoundError from click import progressbar -from .common import generate_index_name +from .utils import generate_index_name from .sync_client import ElasticResourceClient diff --git a/superdesk/core/elastic/resources.py b/superdesk/core/elastic/resources.py index 73dcfbfb23..92f4c552bc 100644 --- a/superdesk/core/elastic/resources.py +++ b/superdesk/core/elastic/resources.py @@ -19,9 +19,10 @@ from elasticsearch.exceptions import NotFoundError, RequestError from superdesk.core.errors import ElasticNotConfiguredForResource +from superdesk.core.types import ElasticResourceConfig, ElasticClientConfig from .mapping import get_elastic_mapping_from_model -from .common import ElasticResourceConfig, ElasticClientConfig, generate_index_name +from .utils import generate_index_name from .sync_client import ElasticResourceClient from .async_client import ElasticResourceAsyncClient from .reindex import reindex @@ -73,6 +74,11 @@ def __init__(self, app: "SuperdeskAsyncApp"): self.app = app + # Import the module from here so we aren't importing from ``core.resources`` module in ``core.elastic`` + from .signals import on_resource_registered + + self.app.resources.on_resource_registered.connect(on_resource_registered) + def register_resource_config( self, resource_name: str, diff --git a/superdesk/core/elastic/utils.py b/superdesk/core/elastic/utils.py new file mode 100644 index 0000000000..5b464a56ef --- /dev/null +++ b/superdesk/core/elastic/utils.py @@ -0,0 +1,6 @@ +from uuid import uuid4 + + +def generate_index_name(alias: str): + random = str(uuid4()).split("-")[0] + return "{}_{}".format(alias, random) diff --git a/superdesk/core/mongo/__init__.py b/superdesk/core/mongo/__init__.py new file mode 100644 index 0000000000..9523430fa2 --- /dev/null +++ b/superdesk/core/mongo/__init__.py @@ -0,0 +1,7 @@ +from .mongo_manager import MongoResources +from .utils import get_mongo_client_config + +__all__ = [ + "MongoResources", + "get_mongo_client_config", +] diff --git a/superdesk/core/resources/__init__.py b/superdesk/core/resources/__init__.py index 7fb9fe3405..c5ece4bb4d 100644 --- a/superdesk/core/resources/__init__.py +++ b/superdesk/core/resources/__init__.py @@ -8,22 +8,22 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license +from superdesk.core.types import MongoResourceConfig, MongoIndexOptions, ElasticResourceConfig from .utils import get_projection_from_request from .model import ( BaseModel, - Resources, ResourceModel, ResourceModelWithObjectId, ModelWithVersions, - ResourceConfig, dataclass, Dataclass, default_model_config, ) +from .resource_config import ResourceConfig +from .resource_manager import Resources from .resource_rest_endpoints import RestEndpointConfig, RestParentLink, get_id_url_type from .service import AsyncResourceService, AsyncCacheableService -from ..mongo import MongoResourceConfig, MongoIndexOptions -from ..elastic.resources import ElasticResourceConfig +from .resource_signals import global_signals __all__ = [ "get_projection_from_request", @@ -43,4 +43,5 @@ "MongoResourceConfig", "MongoIndexOptions", "ElasticResourceConfig", + "global_signals", ] diff --git a/superdesk/core/types/__init__.py b/superdesk/core/types/__init__.py index a0fa3ccc83..c17f702760 100644 --- a/superdesk/core/types/__init__.py +++ b/superdesk/core/types/__init__.py @@ -26,6 +26,8 @@ RestResponseMeta, RestGetResponse, ) +from .mongo import MongoIndexCollation, MongoIndexOptions, MongoResourceConfig, MongoClientConfig +from .elastic import ElasticResourceConfig, ElasticClientConfig __all__ = [ @@ -59,4 +61,12 @@ # system "NotificationClientProtocol", "WSGIApp", + # MongoDB: + "MongoIndexCollation", + "MongoIndexOptions", + "MongoResourceConfig", + "MongoClientConfig", + # ElasticSearch: + "ElasticResourceConfig", + "ElasticClientConfig", ] diff --git a/superdesk/core/types/elastic.py b/superdesk/core/types/elastic.py new file mode 100644 index 0000000000..89fbe35490 --- /dev/null +++ b/superdesk/core/types/elastic.py @@ -0,0 +1,68 @@ +from typing import Any, Callable +from dataclasses import dataclass + +from ..config import ConfigModel +from .search import SearchRequest, SortParam + + +@dataclass +class ElasticResourceConfig: + """Resource config for use with Elasticsearch, to be included with the ResourceConfig""" + + #: Config prefix to be used + prefix: str = "ELASTICSEARCH" + + #: The default sort + default_sort: SortParam | None = None + + #: The default maximum number of documents to be returned + default_max_results: int = 25 + + #: An optional filter to be applied to all searches + filter: dict[str, Any] | None = None + + #: An optional callback used to construct a filter dynamically, to be applied to all searches + filter_callback: Callable[[SearchRequest | None], dict[str, Any]] | None = None + + #: An optional dictionary of field aggregations + aggregations: dict[str, Any] | None = None + + #: An optional dictionary of highlights to be applied + highlight: Callable[[str], dict[str, Any] | None] | None = None + + #: An optional list of facets to be applied (Will this be required in new version?) + facets: dict[str, Any] | None = None + + +class ElasticClientConfig(ConfigModel): + """Dataclass for storing an Elastic config for a specific resource""" + + #: The index prefix to use for the resource + index: str = "superdesk" + + #: The URL of the Elasticsearch instance to connect to + url: str = "http://localhost:9200" + + #: Refresh the Elasticsearch index after uploading documents to the index + force_refresh: bool = True + + #: If ``True``, automatically requests aggregations on search. + auto_aggregations: bool = True + + #: Set the default ``track_total_hits`` for search requests. See https://www.elastic.co/guide/en/elasticsearch/reference/master/search-your-data.html#track-total-hits + track_total_hits: int = 10000 + + #: Number of retries when timing out + retry_on_timeout: bool = True + + #: Maximum number of retries + max_retries: int = 3 + + #: Number of retries on update if there is a conflict + retry_on_conflict: int = 5 + + #: Optional dict to use when connecting to an Elasticsearch instance + options: dict[str, Any] | None = None + + #: Settings to be placed on the Elasticsearch index when creating it + settings: dict[str, Any] | None = None diff --git a/superdesk/core/types/mongo.py b/superdesk/core/types/mongo.py new file mode 100644 index 0000000000..0df9fc9fd7 --- /dev/null +++ b/superdesk/core/types/mongo.py @@ -0,0 +1,83 @@ +from typing import Dict, List, Optional, Any, TypedDict +from dataclasses import dataclass + +from .search import SortListParam +from ..config import ConfigModel + + +class MongoIndexCollation(TypedDict): + """TypedDict class for ``collation`` config + + See https://www.mongodb.com/docs/manual/core/index-case-insensitive + """ + + #: Specifies language rules + locale: str + + #: Determines comparison rules. A strength value of 1 or 2 indicates case-insensitive collation + strength: int + + +@dataclass +class MongoIndexOptions: + """Dataclass for easy construction of Mongo Index options + + See https://mongodb.com/docs/manual/reference/method/db.collection.createIndex + """ + + #: Name of the MongoDB Index + name: str + + #: List of keys to be used for the MongoDB Index + keys: SortListParam + + #: Ensures that the indexed fields do not store duplicate values + unique: bool = True + + #: Create index in the background, allowing read and write operations to the database while the index builds + background: bool = True + + #: If True, the index only references documents with the specified field. + sparse: bool = True + + #: allows users to specify language-specific rules for string comparison + collation: Optional[MongoIndexCollation] = None + + #: allows to filter documents for this index + partialFilterExpression: Optional[Dict[str, Any]] = None + + +@dataclass +class MongoResourceConfig: + """Resource config for use with MongoDB, to be included with the ResourceConfig""" + + #: Config prefix to be used + prefix: str = "MONGO" + + #: Optional list of mongo indexes to be created for this resource + indexes: Optional[List[MongoIndexOptions]] = None + + #: Optional list of mongo indexes to be created for the versioning resource + version_indexes: Optional[List[MongoIndexOptions]] = None + + #: Boolean determining if this resource supports versioning + versioning: bool = False + + +class MongoClientConfig(ConfigModel): + host: str = "localhost" + port: int = 27017 + appname: str = "superdesk" + dbname: str = "superdesk" + connect: bool = True + tz_aware: bool = True + write_concern: Optional[Dict[str, Any]] = {"w": 1} + replicaSet: Optional[str] = None + uri: Optional[str] = None + document_class: Optional[type] = None + username: Optional[str] = None + password: Optional[str] = None + options: Optional[Dict[str, Any]] = None + auth_mechanism: Optional[str] = None + auth_source: Optional[str] = None + auth_mechanism_properties: Optional[str] = None From 627750180c74b9df5cf4ddaf9a3a040e18e39937 Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:10:41 +1100 Subject: [PATCH 02/11] Improve resources code layout --- superdesk/core/resources/model.py | 148 ++----------------- superdesk/core/resources/resource_config.py | 59 ++++++++ superdesk/core/resources/resource_manager.py | 84 +++++++++++ superdesk/core/resources/types.py | 5 + 4 files changed, 159 insertions(+), 137 deletions(-) create mode 100644 superdesk/core/resources/resource_config.py create mode 100644 superdesk/core/resources/resource_manager.py create mode 100644 superdesk/core/resources/types.py diff --git a/superdesk/core/resources/model.py b/superdesk/core/resources/model.py index a73677b4ee..242389af13 100644 --- a/superdesk/core/resources/model.py +++ b/superdesk/core/resources/model.py @@ -13,14 +13,12 @@ Annotated, Optional, List, - Dict, - Type, Any, ClassVar, cast, ) from typing_extensions import dataclass_transform, Self -from dataclasses import dataclass as python_dataclass, field as dataclass_field +from dataclasses import field as dataclass_field from copy import deepcopy from inspect import get_annotations from datetime import datetime @@ -37,7 +35,7 @@ from pydantic_core import InitErrorDetails, PydanticCustomError, from_json from pydantic.dataclasses import dataclass as pydataclass -from superdesk.core.types import SortListParam, ProjectedFieldArg, BaseModel +from superdesk.core.types import BaseModel from superdesk.core.utils import generate_guid, GUID_NEWSML from .utils import get_model_aliased_fields @@ -160,6 +158,12 @@ def uses_objectid_for_id(cls) -> bool: except KeyError: return False + @classmethod + def get_signals(cls) -> "ResourceSignals[Self]": + from .resource_signals import get_resource_signals + + return get_resource_signals(cls) + def to_dict(self, **kwargs) -> dict[str, Any]: """ Convert the model instance to a dictionary representation with non-JSON-serializable Python objects. @@ -229,6 +233,8 @@ async def get_user_email(user_id: ObjectId) -> str | None: async def _run_async_validators_from_model_class( model_instance: Any, root_item: ResourceModel, field_name_stack: Optional[List[str]] = None ): + from .validators import AsyncValidator + if field_name_stack is None: field_name_stack = [] @@ -317,137 +323,5 @@ def get_versioned_model(model: ResourceModel) -> ModelWithVersions | None: return None if not model_has_versions(model) else cast(ModelWithVersions, model) -@python_dataclass -class ResourceConfig: - """A config for a Resource to be registered""" - - #: Name of the resource (must be unique in the system) - name: str - - #: The ResourceModel class for this resource (used to generate the Elasticsearch mapping) - data_class: type[ResourceModel] - - #: Optional title used in HATEOAS (and docs), will fallback to the class name - title: str | None = None - - #: The config used for MongoDB - mongo: Optional["MongoResourceConfig"] = None - - #: The config used for Elasticsearch, if `None` then this resource will not be available in Elasticsearch - elastic: Optional["ElasticResourceConfig"] = None - - #: Optional ResourceService class, if not provided the system will create a generic one, with no resource type - service: Optional[Type["AsyncResourceService"]] = None - - #: Optional config to be used for REST endpoints. If not provided, REST will not be available for this resource - rest_endpoints: Optional["RestEndpointConfig"] = None - - #: Optional config to query and store ObjectIds as strings in MongoDB - query_objectid_as_string: bool = False - - #: Boolean to indicate if etag concurrency control should be used (defaults to ``True``) - uses_etag: bool = True - - #: Optional list of resource fields to ignore when generating the etag - etag_ignore_fields: Optional[list[str]] = None - - #: Boolean to indicate if this resource provides a version resource as well - versioning: bool = False - - #: Optional list of fields not to store in the versioning resource - ignore_fields_in_versions: list[str] | None = None - - #: Optional sorting for this resource - default_sort: SortListParam | None = None - - #: Optionally override the name used for the MongoDB/Elastic sources - datasource_name: str | None = None - - #: Optional projection to be used to include/exclude fields - projection: ProjectedFieldArg | None = None - - -class Resources: - """A high level resource class used to manage all resources in the system""" - - _resource_configs: Dict[str, ResourceConfig] - - _resource_services: Dict[str, "AsyncResourceService"] - - #: A reference back to the parent app, for configuration purposes - app: "SuperdeskAsyncApp" - - def __init__(self, app: "SuperdeskAsyncApp"): - self._resource_configs = {} - self._resource_services = {} - self.app = app - - def register(self, config: ResourceConfig): - """Register a new resource in the system - - This will also register the resource with Mongo and optionally Elasticsearch - - :param config: A ResourceConfig of the resource to be registered - :raises KeyError: If the resource has already been registered - """ - - if config.name in self._resource_configs: - raise KeyError(f"Resource '{config.name}' already registered") - - self._resource_configs[config.name] = config - - config.data_class.model_resource_name = config.name - if not config.datasource_name: - config.datasource_name = config.name - - mongo_config = config.mongo or MongoResourceConfig() - if config.versioning: - mongo_config.versioning = True - - self.app.mongo.register_resource_config(config.name, mongo_config) - - if config.elastic is not None: - if config.default_sort: - config.elastic.default_sort = config.default_sort - self.app.elastic.register_resource_config( - config.name, - config.elastic, - ) - - if config.service is None: - - class GenericResourceService(AsyncResourceService): - pass - - GenericResourceService.resource_name = config.name - GenericResourceService.config = config - config.service = GenericResourceService - - config.service.resource_name = config.name - self._resource_services[config.name] = config.service() - - def get_config(self, name: str) -> ResourceConfig: - """Get the config for a registered resource - - :param name: The name of the registered resource - :return: A copy of the ResourceConfig of the registered resource - :raises KeyError: If the resource is not registered - """ - - return deepcopy(self._resource_configs[name]) - - def get_all_configs(self) -> List[ResourceConfig]: - """Get a copy of the configs for all the registered resources in the system""" - - return deepcopy(list(self._resource_configs.values())) - - def get_resource_service(self, resource_name: str) -> "AsyncResourceService": - return self._resource_services[resource_name] - - -from ..app import SuperdeskAsyncApp # noqa: E402 from .service import AsyncResourceService # noqa: E402 -from ..mongo import MongoResourceConfig # noqa: E402 -from ..elastic import ElasticResourceConfig # noqa: E402 -from .validators import AsyncValidator # noqa: E402 -from .resource_rest_endpoints import RestEndpointConfig # noqa: E402 +from .resource_signals import ResourceSignals # noqa: E402 diff --git a/superdesk/core/resources/resource_config.py b/superdesk/core/resources/resource_config.py new file mode 100644 index 0000000000..e3e9940949 --- /dev/null +++ b/superdesk/core/resources/resource_config.py @@ -0,0 +1,59 @@ +from typing import Union +from dataclasses import dataclass + +from superdesk.core.types import SortListParam, ProjectedFieldArg, MongoResourceConfig, ElasticResourceConfig + + +@dataclass +class ResourceConfig: + """A config for a Resource to be registered""" + + #: Name of the resource (must be unique in the system) + name: str + + #: The ResourceModel class for this resource (used to generate the Elasticsearch mapping) + data_class: type["ResourceModel"] + + #: Optional title used in HATEOAS (and docs), will fallback to the class name + title: str | None = None + + #: The config used for MongoDB + mongo: MongoResourceConfig | None = None + + #: The config used for Elasticsearch, if `None` then this resource will not be available in Elasticsearch + elastic: ElasticResourceConfig | None = None + + #: Optional ResourceService class, if not provided the system will create a generic one, with no resource type + service: type["AsyncResourceService"] | None = None + + #: Optional config to be used for REST endpoints. If not provided, REST will not be available for this resource + rest_endpoints: Union["RestEndpointConfig", None] = None + + #: Optional config to query and store ObjectIds as strings in MongoDB + query_objectid_as_string: bool = False + + #: Boolean to indicate if etag concurrency control should be used (defaults to ``True``) + uses_etag: bool = True + + #: Optional list of resource fields to ignore when generating the etag + etag_ignore_fields: list[str] | None = None + + #: Boolean to indicate if this resource provides a version resource as well + versioning: bool = False + + #: Optional list of fields not to store in the versioning resource + ignore_fields_in_versions: list[str] | None = None + + #: Optional sorting for this resource + default_sort: SortListParam | None = None + + #: Optionally override the name used for the MongoDB/Elastic sources + datasource_name: str | None = None + + #: Optional projection to be used to include/exclude fields + projection: ProjectedFieldArg | None = None + + +from .resource_rest_endpoints import RestEndpointConfig # noqa: E402 +from .model import ResourceModel # noqa: E402 +from .service import AsyncResourceService # noqa: E402 diff --git a/superdesk/core/resources/resource_manager.py b/superdesk/core/resources/resource_manager.py new file mode 100644 index 0000000000..b2a802d60d --- /dev/null +++ b/superdesk/core/resources/resource_manager.py @@ -0,0 +1,84 @@ +from copy import deepcopy + +from superdesk.core.app import SuperdeskAsyncApp +from superdesk.core.signals import SignalGroup, Signal + +from .service import AsyncResourceService +from .resource_config import ResourceConfig + + +class Resources(SignalGroup): + """A high level resource class used to manage all resources in the system""" + + _resource_configs: dict[str, ResourceConfig] + + _resource_services: dict[str, AsyncResourceService] + + signal_name_prefix = "resources:" + + #: Signal fired when a new resource was just registered with the system + on_resource_registered: Signal[SuperdeskAsyncApp, ResourceConfig] + + #: A reference back to the parent app, for configuration purposes + app: SuperdeskAsyncApp + + def __init__(self, app: SuperdeskAsyncApp): + # TODO-ASYNC: Do we need to manually initialise this signal??? + self.on_resource_registered = Signal("resources:on_resource_registered") + + super().__init__() + self._resource_configs = {} + self._resource_services = {} + self.app = app + + def register(self, config: ResourceConfig): + """Register a new resource in the system + + This will also register the resource with Mongo and optionally Elasticsearch + + :param config: A ResourceConfig of the resource to be registered + :raises KeyError: If the resource has already been registered + """ + + if config.name in self._resource_configs: + raise KeyError(f"Resource '{config.name}' already registered") + + self._resource_configs[config.name] = config + + config.data_class.model_resource_name = config.name + if not config.datasource_name: + config.datasource_name = config.name + + self.register_service(config) + self.on_resource_registered.send(self.app, config) + + def register_service(self, config: ResourceConfig): + if config.service is None: + + class GenericResourceService(AsyncResourceService): + pass + + GenericResourceService.resource_name = config.name + GenericResourceService.config = config + config.service = GenericResourceService + + config.service.resource_name = config.name + self._resource_services[config.name] = config.service() + + def get_config(self, name: str) -> ResourceConfig: + """Get the config for a registered resource + + :param name: The name of the registered resource + :return: A copy of the ResourceConfig of the registered resource + :raises KeyError: If the resource is not registered + """ + + return deepcopy(self._resource_configs[name]) + + def get_all_configs(self) -> list[ResourceConfig]: + """Get a copy of the configs for all the registered resources in the system""" + + return deepcopy(list(self._resource_configs.values())) + + def get_resource_service(self, resource_name: str) -> AsyncResourceService: + return self._resource_services[resource_name] diff --git a/superdesk/core/resources/types.py b/superdesk/core/resources/types.py new file mode 100644 index 0000000000..42d73f68ff --- /dev/null +++ b/superdesk/core/resources/types.py @@ -0,0 +1,5 @@ +from typing import TypeVar +from .model import ResourceModel + + +ResourceModelType = TypeVar("ResourceModelType", bound=ResourceModel) From f6ce71ba7a0b379773d1dd0c4857930d1c8a9ebd Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:10:57 +1100 Subject: [PATCH 03/11] Improve mongo code layout --- .../core/{mongo.py => mongo/mongo_manager.py} | 173 ++---------------- superdesk/core/mongo/utils.py | 74 ++++++++ 2 files changed, 90 insertions(+), 157 deletions(-) rename superdesk/core/{mongo.py => mongo/mongo_manager.py} (67%) create mode 100644 superdesk/core/mongo/utils.py diff --git a/superdesk/core/mongo.py b/superdesk/core/mongo/mongo_manager.py similarity index 67% rename from superdesk/core/mongo.py rename to superdesk/core/mongo/mongo_manager.py index 63bbabf2fb..2db8346b75 100644 --- a/superdesk/core/mongo.py +++ b/superdesk/core/mongo/mongo_manager.py @@ -8,170 +8,22 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Dict, List, Optional, Tuple, Any, TypedDict -from dataclasses import dataclass, asdict +from typing import Dict, List, Tuple +from dataclasses import asdict from copy import deepcopy import logging -from pymongo import MongoClient, uri_parser +from pymongo import MongoClient from pymongo.database import Database, Collection from pymongo.errors import OperationFailure, DuplicateKeyError from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection -from superdesk.core.types import SortListParam from superdesk.resource_fields import VERSION_ID_FIELD, CURRENT_VERSION -from .config import ConfigModel +from superdesk.core.types import MongoIndexOptions, MongoResourceConfig -logger = logging.getLogger(__name__) - - -class MongoIndexCollation(TypedDict): - """TypedDict class for ``collation`` config - - See https://www.mongodb.com/docs/manual/core/index-case-insensitive - """ - - #: Specifies language rules - locale: str - - #: Determines comparison rules. A strength value of 1 or 2 indicates case-insensitive collation - strength: int - - -@dataclass -class MongoIndexOptions: - """Dataclass for easy construction of Mongo Index options - - See https://mongodb.com/docs/manual/reference/method/db.collection.createIndex - """ - - #: Name of the MongoDB Index - name: str - - #: List of keys to be used for the MongoDB Index - keys: SortListParam - - #: Ensures that the indexed fields do not store duplicate values - unique: bool = True - - #: Create index in the background, allowing read and write operations to the database while the index builds - background: bool = True - - #: If True, the index only references documents with the specified field. - sparse: bool = True - - #: allows users to specify language-specific rules for string comparison - collation: Optional[MongoIndexCollation] = None - - #: allows to filter documents for this index - partialFilterExpression: Optional[Dict[str, Any]] = None - - -@dataclass -class MongoResourceConfig: - """Resource config for use with MongoDB, to be included with the ResourceConfig""" - - #: Config prefix to be used - prefix: str = "MONGO" +from .utils import get_mongo_client_config - #: Optional list of mongo indexes to be created for this resource - indexes: Optional[List[MongoIndexOptions]] = None - - #: Optional list of mongo indexes to be created for the versioning resource - version_indexes: Optional[List[MongoIndexOptions]] = None - - #: Boolean determining if this resource supports versioning - versioning: bool = False - - -class MongoClientConfig(ConfigModel): - host: str = "localhost" - port: int = 27017 - appname: str = "superdesk" - dbname: str = "superdesk" - connect: bool = True - tz_aware: bool = True - write_concern: Optional[Dict[str, Any]] = {"w": 1} - replicaSet: Optional[str] = None - uri: Optional[str] = None - document_class: Optional[type] = None - username: Optional[str] = None - password: Optional[str] = None - options: Optional[Dict[str, Any]] = None - auth_mechanism: Optional[str] = None - auth_source: Optional[str] = None - auth_mechanism_properties: Optional[str] = None - - -def get_mongo_client_config(app_config: Dict[str, Any], prefix: str = "MONGO") -> Tuple[Dict[str, Any], str]: - config = MongoClientConfig.create_from_dict(app_config, prefix) - - client_kwargs: Dict[str, Any] = { - "appname": config.appname, - "connect": config.connect, - "tz_aware": config.tz_aware, - } - - if config.options is not None: - client_kwargs.update(config.options) - - if config.write_concern is not None: - client_kwargs.update(config.write_concern) - - if config.replicaSet is not None: - client_kwargs["replicaset"] = config.replicaSet - - uri_parser.validate_options(client_kwargs) - - if config.uri is not None: - host = config.uri - # raises an exception if uri is invalid - mongo_settings = uri_parser.parse_uri(host) - - # extract username and password from uri - if mongo_settings.get("username"): - client_kwargs["username"] = mongo_settings["username"] - client_kwargs["password"] = mongo_settings["password"] - - # extract default database from uri - dbname = mongo_settings.get("database") - if not dbname: - dbname = config.dbname - - # extract auth source from uri - auth_source = mongo_settings["options"].get("authSource") - if not auth_source: - auth_source = dbname - else: - dbname = config.dbname - auth_source = dbname - host = config.host - client_kwargs["port"] = config.port - - client_kwargs["host"] = host - client_kwargs["authSource"] = auth_source - - if config.document_class is not None: - client_kwargs["document_class"] = config.document_class - - auth_kwargs: Dict[str, Any] = {} - if config.username is not None: - username = config.username - password = config.password - auth = (username, password) - if any(auth) and not all(auth): - raise Exception("Must set both USERNAME and PASSWORD or neither") - client_kwargs["username"] = username - client_kwargs["password"] = password - if any(auth): - if config.auth_mechanism is not None: - auth_kwargs["authMechanism"] = config.auth_mechanism - if config.auth_source is not None: - auth_kwargs["authSource"] = config.auth_source - if config.auth_mechanism_properties is not None: - auth_kwargs["authMechanismProperties"] = config.auth_mechanism_properties - - return {**client_kwargs, **auth_kwargs}, dbname +logger = logging.getLogger(__name__) class MongoResources: @@ -188,6 +40,11 @@ def __init__(self, app: "SuperdeskAsyncApp"): self._mongo_clients_async = {} self.app = app + # Import the module from here so we aren't importing from ``core.resources`` module in ``core.mongo`` + from .signals import on_resource_registered + + self.app.resources.on_resource_registered.connect(on_resource_registered) + def register_resource_config(self, name: str, config: MongoResourceConfig): """Register a Mongo resource config @@ -367,11 +224,13 @@ def create_collection_indexes( else: raise - def create_indexes_for_all_resources(self): + def create_indexes_for_all_resources(self) -> set[str]: """Creates indexes for all registered resources""" - for resource_name in self.get_all_resource_configs().keys(): + resource_names = set(self.get_all_resource_configs().keys()) + for resource_name in resource_names: self.create_resource_indexes(resource_name) + return resource_names # Async access def get_client_async( @@ -422,4 +281,4 @@ def get_collection_async(self, resource_name: str, versioning: bool = False) -> ) -from .app import SuperdeskAsyncApp # noqa: E402 +from ..app import SuperdeskAsyncApp # noqa: E402 diff --git a/superdesk/core/mongo/utils.py b/superdesk/core/mongo/utils.py new file mode 100644 index 0000000000..b88d5a3070 --- /dev/null +++ b/superdesk/core/mongo/utils.py @@ -0,0 +1,74 @@ +from pymongo import uri_parser + +from superdesk.core.types import MongoClientConfig + + +def get_mongo_client_config(app_config: dict, prefix: str = "MONGO") -> tuple[dict, str]: + config = MongoClientConfig.create_from_dict(app_config, prefix) + + client_kwargs: dict = { + "appname": config.appname, + "connect": config.connect, + "tz_aware": config.tz_aware, + } + + if config.options is not None: + client_kwargs.update(config.options) + + if config.write_concern is not None: + client_kwargs.update(config.write_concern) + + if config.replicaSet is not None: + client_kwargs["replicaset"] = config.replicaSet + + uri_parser.validate_options(client_kwargs) + + if config.uri is not None: + host = config.uri + # raises an exception if uri is invalid + mongo_settings = uri_parser.parse_uri(host) + + # extract username and password from uri + if mongo_settings.get("username"): + client_kwargs["username"] = mongo_settings["username"] + client_kwargs["password"] = mongo_settings["password"] + + # extract default database from uri + dbname = mongo_settings.get("database") + if not dbname: + dbname = config.dbname + + # extract auth source from uri + auth_source = mongo_settings["options"].get("authSource") + if not auth_source: + auth_source = dbname + else: + dbname = config.dbname + auth_source = dbname + host = config.host + client_kwargs["port"] = config.port + + client_kwargs["host"] = host + client_kwargs["authSource"] = auth_source + + if config.document_class is not None: + client_kwargs["document_class"] = config.document_class + + auth_kwargs: dict = {} + if config.username is not None: + username = config.username + password = config.password + auth = (username, password) + if any(auth) and not all(auth): + raise Exception("Must set both USERNAME and PASSWORD or neither") + client_kwargs["username"] = username + client_kwargs["password"] = password + if any(auth): + if config.auth_mechanism is not None: + auth_kwargs["authMechanism"] = config.auth_mechanism + if config.auth_source is not None: + auth_kwargs["authSource"] = config.auth_source + if config.auth_mechanism_properties is not None: + auth_kwargs["authMechanismProperties"] = config.auth_mechanism_properties + + return {**client_kwargs, **auth_kwargs}, dbname From 86c2307ee6534877a4f8b864ab1f7bf834534ab6 Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:11:10 +1100 Subject: [PATCH 04/11] Add new functionality to signal module --- superdesk/core/signals.py | 186 ++++++++++++++++++++++++-------------- 1 file changed, 117 insertions(+), 69 deletions(-) diff --git a/superdesk/core/signals.py b/superdesk/core/signals.py index 55237e548f..df70520161 100644 --- a/superdesk/core/signals.py +++ b/superdesk/core/signals.py @@ -1,84 +1,37 @@ -from typing import Generic, Callable, Awaitable -from typing_extensions import TypeVarTuple, Unpack -from inspect import isawaitable +from typing import Generic, Callable, Awaitable, Iterable, ClassVar +from typing_extensions import TypeVarTuple, Unpack, Self +from inspect import isawaitable, iscoroutinefunction, getmembers, get_annotations SignalFunctionSignature = TypeVarTuple("SignalFunctionSignature") +SignalListener = Callable[[Unpack[SignalFunctionSignature]], None] +AsyncSignalListener = Callable[[Unpack[SignalFunctionSignature]], Awaitable[None] | None] | SignalListener -class AsyncSignal(Generic[Unpack[SignalFunctionSignature]]): - """Strictly typed event signal system - - An event signalling system that is strictly typed. - This means that we're able to run ``mypy`` to check the types connected to, or executing, signal callbacks, - potentially raising type errors before they're able to get to an instance and cause a bug. - - You're able to define a signal with variable number of arguments, without an upper limit on the number of arguments. - - You create a signal by constructing an instance of one, providing the function signature types and name of the signal. - - AsyncSignal[````](````): - - * ``function signature``: A variable list of argument types - * ``signal name``: The name of the signal - - The signal supports both sync and async listeners to be added to its listeners. - - Example signal:: - - # file: users/signals.py - from superdesk.core import AsyncSignal - from .types import UserResourceModel - - before_user_created = AsyncSignal[UserResourceModel]( - "user:before_created" - ) - on_user_created = AsyncSignal[UserResourceModel]( - "user:on_created" - ) - - async create_user(user: UserResourceModel): - await before_user_created.send(user) - await UserResourceModel.get_service().create([user]) - await on_user_created.send(user) - - Example connecting:: - - # file: my/module.py - from users import UserResourceModel, signals as user_signals - - async def before_user_created(user: UserResourceModel) -> None: - # Do something before the user is created - ... - - async def on_user_created(user: UserResourceModel) -> None: - # Do something with the user that was just created - ... - - async def invalid_signal_callback(arg1: bool) -> str: - return "True" if arg1 else "False" - - def init_module(): - user_signals.before_user_created += before_user_created - user_signals.on_user_created += on_user_created - - # This next line will cause ``mypy`` checks to fail - # as the signature is invalid for the signal - user_signals.on_user_created += invalid_signal_callback +class Signal(Generic[Unpack[SignalFunctionSignature]]): + """ + A synchronous signal """ #: Name of the signal - name: str + _name: str - _listeners: set[Callable[[Unpack[SignalFunctionSignature]], Awaitable[None] | None]] + _async = False + _default_listeners: set[AsyncSignalListener] + _listeners: set[AsyncSignalListener] - def __init__(self, name: str): - self.name = name - self._listeners = set() + def __init__(self, name: str, listeners: Iterable[AsyncSignalListener] | None = None): + self._name = name + self._default_listeners = set(listeners) if listeners else set() + self._listeners = self._default_listeners.copy() def __repr__(self): return f"signal '{self.name}'" + @property + def name(self) -> str: + return self._name + def connect(self, callback: Callable[[Unpack[SignalFunctionSignature]], Awaitable[None] | None]) -> None: """Connect a function to this signal, to be called when fired @@ -92,6 +45,9 @@ def connect(self, callback: Callable[[Unpack[SignalFunctionSignature]], Awaitabl :param callback: function to register with this signal """ + if not self._async and iscoroutinefunction(callback): + raise RuntimeError(f"Async listener attempting to be added to sync signal: {self}") + self._listeners.add(callback) def disconnect(self, callback: Callable[[Unpack[SignalFunctionSignature]], Awaitable[None] | None]) -> None: @@ -109,6 +65,10 @@ def disconnect(self, callback: Callable[[Unpack[SignalFunctionSignature]], Await self._listeners.remove(callback) + def clear_listeners(self) -> None: + self._listeners = set() + # self._listeners = self._default_listeners.copy() + def __iadd__(self, callback: Callable[[Unpack[SignalFunctionSignature]], Awaitable[None] | None]): """Connect a function to this signal, to be called when fired @@ -130,10 +90,37 @@ def __isub__(self, callback: Callable[[Unpack[SignalFunctionSignature]], Awaitab def __len__(self): return len(self._listeners) - async def __call__(self, *args: Unpack[SignalFunctionSignature]) -> None: + def __call__(self, *args: Unpack[SignalFunctionSignature]) -> None: + self.send(*args) + + def send(self, *args: Unpack[SignalFunctionSignature]) -> None: + """Call all the registered callbacks connected to this signal + + You can also use the call operator to call registered callbacks. + Example:: + + signal.send(args) + # is the same as + signal(args) + + :param args: The arguments to send to each registered function + """ + + for callback in self._listeners: + callback(*args) + + +class AsyncSignal(Signal[Unpack[SignalFunctionSignature]], Generic[Unpack[SignalFunctionSignature]]): + """ + An asynchronous signal + """ + + _async = True + + async def __call__(self, *args: Unpack[SignalFunctionSignature]) -> None: # type: ignore[override] await self.send(*args) - async def send(self, *args: Unpack[SignalFunctionSignature]) -> None: + async def send(self, *args: Unpack[SignalFunctionSignature]) -> None: # type: ignore[override] """Call all the registered callbacks connected to this signal You can also use the call operator to call registered callbacks. @@ -150,3 +137,64 @@ async def send(self, *args: Unpack[SignalFunctionSignature]) -> None: response = callback(*args) if isawaitable(response): await response + + +class SignalGroup: + """ + A mixin for supporting class signal attributes + + * Fields that start with `on_` and have no value, will be auto-initialised + * Util methods to: + * get instance attributes that are Signal, AsyncSignal, or SignalGroup + * clear all listeners + * connect another group to common signals in this group + """ + + #: Optional prefix string used when auto-initialising class signal attributes + signal_name_prefix: ClassVar[str] = "" + + def __init__(self): + # Initialise all fields that are + # * not already defined + # * it's type is a sub-class of ``Signal``, ``AsyncSignal`` or ``SignalGroup`` + + for field_name, annotation in get_annotations(self.__class__).items(): + if not field_name.startswith("on_") or getattr(self, field_name, None) is not None: + continue + + try: + annotation_instance = annotation() + except TypeError: + annotation_instance = annotation(f"{self.signal_name_prefix}{field_name}") + + if isinstance(annotation_instance, (Signal, AsyncSignal, SignalGroup)): + setattr(self, field_name, annotation_instance) + + def __repr__(self): + return f"signal group '{self.__class__.__name__}'" + + def get_all_signals(self) -> list[tuple[str, Signal | AsyncSignal]]: + """Get all attributes of this class that are a ``Signal`` or ``AsyncSignal`` instance""" + + return getmembers(self, lambda x: isinstance(x, (Signal, AsyncSignal))) + + def get_all_groups(self) -> list[tuple[str, Self]]: + """Get all attributes of this class that are an instance of ``SignalGroup``""" + + return getmembers(self, lambda x: isinstance(x, SignalGroup)) + + def clear_listeners(self) -> None: + """Clear all listeners for all group and signal attributes of this class""" + + for key, group in self.get_all_groups(): + group.clear_listeners() + for key, signal in self.get_all_signals(): + signal.clear_listeners() + + def connect_group(self, group: Self): + """Connect another group to common signals in this group""" + + for key, signal in self.get_all_signals(): + listener = getattr(group, key, None) + if listener is not None: + signal.connect(listener) From b4baad927c303457293fddf780c3a6604f05b33c Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:11:28 +1100 Subject: [PATCH 05/11] Add on_resource_registered signal --- superdesk/core/app.py | 2 +- superdesk/core/elastic/signals.py | 15 +++++++++++++++ superdesk/core/mongo/signals.py | 12 ++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 superdesk/core/elastic/signals.py create mode 100644 superdesk/core/mongo/signals.py diff --git a/superdesk/core/app.py b/superdesk/core/app.py index 6d7d617422..7cdc29a4d4 100644 --- a/superdesk/core/app.py +++ b/superdesk/core/app.py @@ -52,9 +52,9 @@ def __init__(self, wsgi: WSGIApp): self._imported_modules = {} self._module_configs = {} self.wsgi = wsgi + self.resources = Resources(self) self.mongo = MongoResources(self) self.elastic = ElasticResources(self) - self.resources = Resources(self) self.auth = self.load_auth_module() self._store_app() diff --git a/superdesk/core/elastic/signals.py b/superdesk/core/elastic/signals.py new file mode 100644 index 0000000000..b734c867fc --- /dev/null +++ b/superdesk/core/elastic/signals.py @@ -0,0 +1,15 @@ +from superdesk.core.app import SuperdeskAsyncApp + +from ..resources import ResourceConfig + + +def on_resource_registered(app: SuperdeskAsyncApp, config: ResourceConfig) -> None: + if not config.elastic: + return + elif config.default_sort: + config.elastic.default_sort = config.default_sort + + app.elastic.register_resource_config( + config.name, + config.elastic, + ) diff --git a/superdesk/core/mongo/signals.py b/superdesk/core/mongo/signals.py new file mode 100644 index 0000000000..812f2aa27d --- /dev/null +++ b/superdesk/core/mongo/signals.py @@ -0,0 +1,12 @@ +from superdesk.core.types import MongoResourceConfig +from superdesk.core.app import SuperdeskAsyncApp + +from ..resources import ResourceConfig + + +def on_resource_registered(app: SuperdeskAsyncApp, config: ResourceConfig) -> None: + mongo_config = config.mongo or MongoResourceConfig() + if config.versioning: + mongo_config.versioning = True + + app.mongo.register_resource_config(config.name, mongo_config) From 2999d9497bb30bb7bc74e7a2cd0ffb5195d01251 Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:11:39 +1100 Subject: [PATCH 06/11] Add resource data & web signals --- .../core/resources/resource_rest_endpoints.py | 65 ++++-- superdesk/core/resources/resource_signals.py | 197 ++++++++++++++++++ superdesk/core/resources/service.py | 38 +++- 3 files changed, 272 insertions(+), 28 deletions(-) create mode 100644 superdesk/core/resources/resource_signals.py diff --git a/superdesk/core/resources/resource_rest_endpoints.py b/superdesk/core/resources/resource_rest_endpoints.py index 14a335fc49..a41ad13120 100644 --- a/superdesk/core/resources/resource_rest_endpoints.py +++ b/superdesk/core/resources/resource_rest_endpoints.py @@ -35,7 +35,8 @@ from superdesk.core.web import RestEndpoints, ItemRequestViewArgs -from .model import ResourceConfig, ResourceModel +from .model import ResourceModel +from .resource_config import ResourceConfig from .validators import convert_pydantic_validation_error_for_response @@ -233,11 +234,12 @@ async def get_item( """Processes a get single item request""" await self.get_parent_items(request) - service = self.service + signals = self.resource_config.data_class.get_signals() + await signals.web.on_get.send(request) if params.version == "all": items, count = await self.service.get_all_item_versions(args.item_id, params.max_results, params.page) - response = RestGetResponse( + response_data = RestGetResponse( _items=items, _meta=dict( page=params.page, @@ -254,20 +256,24 @@ async def get_item( request, ), ) - return Response(response, 200, [("X-Total-Count", count)]) + response = Response(response_data, 200, [("X-Total-Count", count)]) + await signals.web.on_get_response.send(request, response) + return response elif self.endpoint_config.parent_links: lookup = self.construct_parent_item_lookup(request) - lookup["_id"] = args.item_id if not service.id_uses_objectid() else ObjectId(args.item_id) - item = await service.find_one_raw(use_mongo=True, version=params.version, **lookup) + lookup["_id"] = args.item_id if not self.service.id_uses_objectid() else ObjectId(args.item_id) + item = await self.service.find_one_raw(use_mongo=True, version=params.version, **lookup) else: - item = await service.find_by_id_raw(args.item_id, params.version) + item = await self.service.find_by_id_raw(args.item_id, params.version) if not item: raise SuperdeskApiError.notFoundError( f"{self.resource_config.name} resource with ID '{args.item_id}' not found" ) - return Response(item) + response = Response(item) + await signals.web.on_get_response.send(request, response) + return response async def create_item(self, request: Request) -> Response: """Processes a create item request""" @@ -296,11 +302,13 @@ async def create_item(self, request: Request) -> Response: except ValidationError as validation_error: return Response(convert_pydantic_validation_error_for_response(validation_error), 403) + signals = self.resource_config.data_class.get_signals() + await signals.web.on_create.send(request, model_instances) ids = await service.create(model_instances) if len(ids) == 1: - return Response(self._populate_item_hateoas(request, model_instances[0].to_dict()), 201) + response = Response(self._populate_item_hateoas(request, model_instances[0].to_dict()), 201) else: - return Response( + response = Response( { STATUS: STATUS_OK, ITEMS: [self._populate_item_hateoas(request, item.to_dict()) for item in model_instances], @@ -308,6 +316,9 @@ async def create_item(self, request: Request) -> Response: 201, ) + await signals.web.on_create_response.send(request, response) + return response + async def update_item( self, args: ItemRequestViewArgs, @@ -328,8 +339,18 @@ async def update_item( if payload is None: raise SuperdeskApiError.badRequestError("Empty payload") + signals = self.resource_config.data_class.get_signals() + original = await self.service.find_by_id(args.item_id) + if original is None: + raise SuperdeskApiError.notFoundError( + f"{self.resource_config.name} resource with ID '{args.item_id}' not found" + ) + + await signals.web.on_update.send(request, original, payload) + payload = payload.copy() + try: - await self.service.update(args.item_id, payload, if_match) + await self.service.update(args.item_id, payload, if_match, original) except ValidationError as validation_error: return Response(convert_pydantic_validation_error_for_response(validation_error), 403) @@ -341,7 +362,9 @@ async def update_item( ) self._populate_item_hateoas(request, payload) - return Response(payload) + response = Response(payload) + await signals.web.on_update_response.send(request, response) + return response async def delete_item(self, args: ItemRequestViewArgs, params: None, request: Request) -> Response: """Processes a delete item request""" @@ -361,8 +384,12 @@ async def delete_item(self, args: ItemRequestViewArgs, params: None, request: Re "To edit a document its etag must be provided using the If-Match header" ) + signals = self.resource_config.data_class.get_signals() + await signals.web.on_delete.send(request, original) await service.delete(original, if_match) - return Response({}, 204) + response = Response({}, 204) + await signals.web.on_delete_response.send(request, response) + return response async def search_items( self, @@ -385,10 +412,12 @@ async def search_items( params.where.update(lookup) params.args = cast(SearchArgs, params.model_extra) + signals = self.resource_config.data_class.get_signals() + await signals.web.on_search.send(request, params) cursor = await self.service.find(params) count = await cursor.count() - response = RestGetResponse( + response_data = RestGetResponse( _items=await cursor.to_list_raw(), _meta=dict( page=params.page, @@ -399,11 +428,13 @@ async def search_items( status = 200 headers = [("X-Total-Count", count)] - response["_links"] = self._build_resource_hateoas(params, count, request) + response_data["_links"] = self._build_resource_hateoas(params, count, request) if hasattr(cursor, "extra"): - getattr(cursor, "extra")(response) + getattr(cursor, "extra")(response_data) - return Response(response, status, headers) + response = Response(response_data, status, headers) + await signals.web.on_search_response.send(request, response) + return response def _build_resource_hateoas(self, req: SearchRequest, doc_count: Optional[int], request: Request) -> Dict[str, Any]: links = { diff --git a/superdesk/core/resources/resource_signals.py b/superdesk/core/resources/resource_signals.py new file mode 100644 index 0000000000..4771c7bdb6 --- /dev/null +++ b/superdesk/core/resources/resource_signals.py @@ -0,0 +1,197 @@ +from typing import Generic, Any + +from superdesk.core.types import Request, Response, SearchRequest +from superdesk.core.signals import AsyncSignal, SignalGroup + +from .types import ResourceModelType + +__all__ = [ + "global_signals", + "ResourceDataSignals", + "ResourceWebSignals", + "ResourceSignals", + "get_resource_signals", + "clear_all_resource_signal_listeners", +] + + +#: Global signals, used for listening to all resource data/web signals +global_signals: "ResourceSignals" + + +class ResourceDataSignals(SignalGroup, Generic[ResourceModelType]): + """ + A group of signals to be used on a resource data layer + """ + + signal_name_prefix = "resource:data:" + + #: Signal fired before a resource item is saved to the DB + #: + #: Params: + #: - item :class:`ResourceModel `: The item to be created + on_create: AsyncSignal[ResourceModelType] + + #: Signal fired after a resource item has been saved to the DB + #: + #: * item :class:`ResourceModel `: The item that was created + on_created: AsyncSignal[ResourceModelType] + + #: Signal fired before a resource item is updated in the DB + #: + #: Params: + #: - original :class:`ResourceModel `: The original item that is to be updated + #: - updates :class:`dict`: A dictionary of key/value pairs to update + on_update: AsyncSignal[ResourceModelType, dict[str, Any]] + + #: Signal fired after a resource item has been updated in the DB + #: + #: Params: + #: - original :class:`ResourceModel `: The original item that was updated (without changes applied) + #: - updates :class:`dict`: A dictionary of key/value that was updated + on_updated: AsyncSignal[ResourceModelType, dict[str, Any]] + + #: Signal fired before a resource item is to be deleted from the DB + #: + #: Params: + #: - item :class:`ResourceModel `: The item is to be deleted + on_delete: AsyncSignal[ResourceModelType] + + #: Signal fired after a resource item has been deleted from the DB + #: + #: Params: + #: - item :class:`ResourceModel `: The item that was deleted + on_deleted: AsyncSignal[ResourceModelType] + + +class ResourceWebSignals(SignalGroup, Generic[ResourceModelType]): + """ + A group of signals to be used on a resource web api layer + """ + + signal_name_prefix = "resource:web:" + + #: Signal fired before processing a Web request to create a new resource item + #: + #: Params: + #: - request :class:`Request `: The Web request instance + #: - items :class:`list[ResourceModel] `: A list of items to be created + on_create: AsyncSignal[Request, list[ResourceModelType]] + + #: Signal fired before sending Web response from a new resource request + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - response :class:`Response `: The response to be returned to the client + on_create_response: AsyncSignal[Request, Response] + + #: Signal fired before processing a Web request to update an existing resource item + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - original :class:`ResourceModel `: The original item that is to be updated + #: - updates :class:`dict`: A dictionary of key/value pairs to update + on_update: AsyncSignal[Request, ResourceModelType, dict[str, Any]] + + #: Signal fired before sending Web response from an update resource request + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - response :class:`Response `: The response to be returned to the client + on_update_response: AsyncSignal[Request, Response] + + #: Signal fired before processing a Web request to delete a resource item + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - item :class:`ResourceModel `: The resource item to be deleted + on_delete: AsyncSignal[Request, ResourceModelType] + + #: Signal fired before sending Web response from a delete resource request + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - response :class:`Response `: The response to be returned to the client + on_delete_response: AsyncSignal[Request, Response] + + #: Signal fired before processing a Web request to get a resource item + #: + #: Params: + #: - request :class:`Request `: The web request instance + on_get: AsyncSignal[Request] + + #: Signal fired before sending Web response from a get resource item request + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - response :class:`Response `: The response to be returned to the client + on_get_response: AsyncSignal[Request, Response] + + #: Signal fired before processing a Web request to search for resource items + #: + #: Params: + #: - request :class:`Request `: The Web request instance + #: - search :class:`SearchRequest `: The search request instance + on_search: AsyncSignal[Request, SearchRequest] + + #: Signal fired before sending Web response from a search resource items request + #: + #: Params: + #: - request :class:`Request `: The web request instance + #: - response :class:`Response `: The response to be returned to the client + on_search_response: AsyncSignal[Request, Response] + + +class ResourceSignals(SignalGroup, Generic[ResourceModelType]): + """ + A group of resource signals, combining both data and web api signals + """ + + #: Data signals sent when interacting with the DB + data: ResourceDataSignals[ResourceModelType] + + #: Web signals sent when receiving REST request through API + web: ResourceWebSignals[ResourceModelType] + + #: Flag to indicate if this resource should connect to global resource signal, defaults to ``True`` + connect_to_global: bool + + def __init__(self, connect_to_global: bool = True): + super().__init__() + self.connect_to_global = connect_to_global + self.data = ResourceDataSignals() + self.web = ResourceWebSignals() + + if self.connect_to_global: + global global_signals + self.data.connect_group(global_signals.data) + self.web.connect_group(global_signals.web) + + def clear_listeners(self) -> None: + super().clear_listeners() + if self.connect_to_global: + global global_signals + self.data.connect_group(global_signals.data) + self.web.connect_group(global_signals.web) + + +global_signals = ResourceSignals(False) +_resource_signal_store: dict[str, ResourceSignals] = {} + + +def get_resource_signals(resource_model_class: type[ResourceModelType]) -> ResourceSignals[ResourceModelType]: + """Get the ``ResourceSignals`` instance for the provided resource type""" + + try: + return _resource_signal_store[resource_model_class.model_resource_name] + except KeyError: + _resource_signal_store[resource_model_class.model_resource_name] = ResourceSignals[ResourceModelType]() + return _resource_signal_store[resource_model_class.model_resource_name] + + +def clear_all_resource_signal_listeners() -> None: + """Clear all listeners from all registered resource signals""" + + for resource_signals in _resource_signal_store.values(): + resource_signals.clear_listeners() + global_signals.clear_listeners() diff --git a/superdesk/core/resources/service.py b/superdesk/core/resources/service.py index f1faabf44e..d482a299d9 100644 --- a/superdesk/core/resources/service.py +++ b/superdesk/core/resources/service.py @@ -13,7 +13,6 @@ Optional, Generic, Sequence, - TypeVar, ClassVar, List, Dict, @@ -46,13 +45,11 @@ from ..app import SuperdeskAsyncApp, get_current_async_app from .cursor import ElasticsearchResourceCursorAsync, MongoResourceCursorAsync, ResourceCursorAsync from .utils import get_projection_from_request +from .types import ResourceModelType logger = logging.getLogger(__name__) -ResourceModelType = TypeVar("ResourceModelType", bound="ResourceModel") - - class AsyncResourceService(Generic[ResourceModelType]): resource_name: ClassVar[str] config: "ResourceConfig" @@ -108,6 +105,10 @@ def elastic(self): return self.app.elastic.get_client_async(self.resource_name) + @property + def signals(self): + return self.config.data_class.get_signals() + def get_model_instance_from_dict(self, data: Dict[str, Any]) -> ResourceModelType: """Converts a dictionary to an instance of ``ResourceModel`` for this resource @@ -293,6 +294,8 @@ async def on_create(self, docs: List[ResourceModelType]) -> None: if doc.updated is None: doc.updated = doc.created + await self.signals.data.on_create.send(doc) + async def validate_create(self, doc: ResourceModelType): """Validate the provided doc for creation @@ -408,7 +411,8 @@ async def on_created(self, docs: List[ResourceModelType]) -> None: :param docs: List of resources that were created """ - pass + for doc in docs: + await self.signals.data.on_created.send(doc) async def on_update(self, updates: Dict[str, Any], original: ResourceModelType) -> None: """Hook to run before updating a resource @@ -423,7 +427,15 @@ async def on_update(self, updates: Dict[str, Any], original: ResourceModelType) if versioned_original: updates["_current_version"] = (versioned_original.current_version or 0) + 1 - async def update(self, item_id: Union[str, ObjectId], updates: Dict[str, Any], etag: str | None = None) -> None: + await self.signals.data.on_update.send(original, updates) + + async def update( + self, + item_id: Union[str, ObjectId], + updates: Dict[str, Any], + etag: str | None = None, + original: ResourceModelType | None = None, + ) -> None: """Updates an existing resource Will automatically update the resource in both Elasticsearch (if configured for this resource) @@ -432,11 +444,14 @@ async def update(self, item_id: Union[str, ObjectId], updates: Dict[str, Any], e :param item_id: ID of item to update :param updates: Dictionary to update :param etag: Optional etag, if provided will check etag against original item + :param original: Optional original item, if not provided the service will retrieve it :raises SuperdeskApiError.notFoundError: If original item not found """ item_id = ObjectId(item_id) if self.id_uses_objectid() else item_id - original = await self.find_by_id(item_id) + if original is None: + original = await self.find_by_id(item_id) + if original is None: raise SuperdeskApiError.notFoundError() @@ -469,7 +484,7 @@ async def on_updated(self, updates: Dict[str, Any], original: ResourceModelType) :param original: Instance of ``ResourceModel`` for the original resource """ - pass + await self.signals.data.on_updated.send(original, updates) async def on_delete(self, doc: ResourceModelType): """Hook to run before deleting a resource @@ -477,7 +492,7 @@ async def on_delete(self, doc: ResourceModelType): :param doc: Instance of ``ResourceModel`` for the resource to delete """ - pass + await self.signals.data.on_delete.send(doc) async def delete(self, doc: ResourceModelType, etag: str | None = None): """Deletes a resource @@ -528,7 +543,7 @@ async def on_deleted(self, doc: ResourceModelType): :param doc: Instance of ``ResourceModel`` for the resource that was deleted """ - pass + await self.signals.data.on_deleted.send(doc) async def get_all(self) -> AsyncIterable[ResourceModelType]: """Helper function to get all items from this resource @@ -920,4 +935,5 @@ async def get_cached_by_id(self, _id: str): return await self.find_by_id(_id) -from .model import ResourceConfig, ResourceModel, get_versioned_model, model_has_versions # noqa: E402 +from .resource_config import ResourceConfig # noqa: E402 +from .model import get_versioned_model, model_has_versions # noqa: E402 From c6a772799cddd9f865c371e8d3d5a3a9b76dd963 Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:11:52 +1100 Subject: [PATCH 07/11] Fix some behave tests --- superdesk/tests/steps.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/superdesk/tests/steps.py b/superdesk/tests/steps.py index ba4211bd7d..43eca05ccd 100644 --- a/superdesk/tests/steps.py +++ b/superdesk/tests/steps.py @@ -1037,6 +1037,10 @@ async def when_we_find_for_resource_the_id_as_name_by_search_criteria(context, r @when('we delete "{url}"') @async_run_until_complete +async def _step_impl_when_delete_url(context, url): + await step_impl_when_delete_url(context, url) + + async def step_impl_when_delete_url(context, url): with context.app.mail.record_messages() as outbox: url = apply_placeholders(context, url) @@ -1084,6 +1088,10 @@ async def when_we_delete_it(context): @when('we patch "{url}"') @async_run_until_complete +async def _step_impl_when_patch_url(context, url): + await step_impl_when_patch_url(context, url) + + async def step_impl_when_patch_url(context, url): with context.app.mail.record_messages() as outbox: url = apply_placeholders(context, url) @@ -1281,6 +1289,10 @@ async def step_impl_then_get_new(context): @then("we get error {code}") @async_run_until_complete +async def _step_impl_then_get_error(context, code): + await step_impl_then_get_error(context, code) + + async def step_impl_then_get_error(context, code): await expect_status(context.response, int(code)) if context.text: @@ -2191,6 +2203,10 @@ async def we_get_null_stage(context): @given('we have sessions "{url}"') @async_run_until_complete +async def _we_have_sessions_get_id(context, url): + await we_have_sessions_get_id(context, url) + + async def we_have_sessions_get_id(context, url): await when_we_get_url(context, url) item = await get_json_data(context.response) From 202eeb19de2700a3afc0dce54bfe7548ef1bf6ac Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:12:00 +1100 Subject: [PATCH 08/11] Update comments.on_created to async --- apps/comments/comments.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/comments/comments.py b/apps/comments/comments.py index 0d254f1591..e6fcb0237e 100644 --- a/apps/comments/comments.py +++ b/apps/comments/comments.py @@ -93,7 +93,7 @@ def on_fetched(self, doc): decode_keys(item, "mentioned_users") decode_keys(item, "mentioned_desks") - def on_created(self, docs): + async def on_created(self, docs): for doc in docs: if self.notifications: push_notification(self.notification_key, item=str(doc.get("item"))) @@ -102,7 +102,7 @@ def on_created(self, docs): if self.notifications: # TODO-ASYNC: Support async (see superdesk.tests.markers.requires_eve_resource_async_event) - notify_mentioned_users(docs, get_app_config("CLIENT_URL", "").rstrip("/")) + await notify_mentioned_users(docs, get_app_config("CLIENT_URL", "").rstrip("/")) notify_mentioned_desks(docs) def on_updated(self, updates, original): From 7baa6143f4a3b81477ee473cd7eb4ca81032731b Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:12:29 +1100 Subject: [PATCH 09/11] Add tests --- tests/core/signals.py | 34 -- tests/core/signals_test.py | 431 +++++++++++++++++++++ tests/storage/modules/assests/resources.py | 3 +- 3 files changed, 432 insertions(+), 36 deletions(-) delete mode 100644 tests/core/signals.py create mode 100644 tests/core/signals_test.py diff --git a/tests/core/signals.py b/tests/core/signals.py deleted file mode 100644 index 61ed11405c..0000000000 --- a/tests/core/signals.py +++ /dev/null @@ -1,34 +0,0 @@ -from unittest import IsolatedAsyncioTestCase -from unittest.mock import Mock, AsyncMock -from superdesk.core import AsyncSignal - - -class AsyncSignalsTestCase(IsolatedAsyncioTestCase): - async def test_signals(self): - signal = AsyncSignal[str, bool]("on_some_event") - - signal_cb1 = Mock() - signal_cb2 = AsyncMock() - signal_cb3 = AsyncMock() - - signal += signal_cb1 - signal.connect(signal_cb2) - signal.connect(signal_cb3) - - await signal.send("monkeys", True) - signal_cb1.assert_called_once_with("monkeys", True) - signal_cb2.assert_called_once_with("monkeys", True) - signal_cb2.assert_awaited() - signal_cb3.assert_called_once_with("monkeys", True) - signal_cb3.assert_awaited() - - signal_cb1.reset_mock() - signal_cb2.reset_mock() - signal_cb3.reset_mock() - - signal -= signal_cb2 - signal.disconnect(signal_cb3) - await signal.send("space", False) - signal_cb1.assert_called_once_with("space", False) - signal_cb2.assert_not_called() - signal_cb3.assert_not_called() diff --git a/tests/core/signals_test.py b/tests/core/signals_test.py new file mode 100644 index 0000000000..f9602805e2 --- /dev/null +++ b/tests/core/signals_test.py @@ -0,0 +1,431 @@ +from typing import Any +from unittest import IsolatedAsyncioTestCase, mock +from unittest.mock import Mock, AsyncMock, ANY + +from superdesk.core import AsyncSignal +from superdesk.core.resources import AsyncResourceService, global_signals +from superdesk.core.resources.resource_signals import clear_all_resource_signal_listeners +from superdesk.core.types import Request, Response, SearchRequest +from superdesk.utc import utcnow +from superdesk.utils import format_time +from superdesk.factory.app import HttpFlaskRequest +from superdesk.errors import SuperdeskApiError + +from superdesk.tests import AsyncTestCase, AsyncFlaskTestCase + +from .modules.users import User +from .fixtures.users import john_doe + +NOW = utcnow() + + +class AsyncSignalsTestCase(IsolatedAsyncioTestCase): + async def test_signals(self): + signal = AsyncSignal[str, bool]("on_some_event") + + signal_cb1 = Mock() + signal_cb2 = AsyncMock() + signal_cb3 = AsyncMock() + + signal += signal_cb1 + signal.connect(signal_cb2) + signal.connect(signal_cb3) + + await signal.send("monkeys", True) + signal_cb1.assert_called_once_with("monkeys", True) + signal_cb2.assert_called_once_with("monkeys", True) + signal_cb2.assert_awaited() + signal_cb3.assert_called_once_with("monkeys", True) + signal_cb3.assert_awaited() + + signal_cb1.reset_mock() + signal_cb2.reset_mock() + signal_cb3.reset_mock() + + signal -= signal_cb2 + signal.disconnect(signal_cb3) + await signal.send("space", False) + signal_cb1.assert_called_once_with("space", False) + signal_cb2.assert_not_called() + signal_cb3.assert_not_called() + + +class ResourceDataSignalsTestCase(AsyncTestCase): + app_config = {"MODULES": ["tests.core.modules.users"]} + service: AsyncResourceService[User] + + async def asyncSetUp(self): + await super().asyncSetUp() + self.service = User.get_service() + + async def asyncTearDown(self): + await super().asyncTearDown() + clear_all_resource_signal_listeners() + + async def test_clear_signal_listeners(self): + signals = User.get_signals() + self.assertEqual(1, len(signals.data.on_create)) + + cb = AsyncMock() + signals.data.on_create.connect(cb) + self.assertEqual(2, len(signals.data.on_create)) + + signals.clear_listeners() + self.assertEqual(1, len(signals.data.on_create)) + + async def test_data_signals(self): + signals = User.get_signals() + + callbacks: dict[str, list[AsyncMock]] = {} + for event in ["create", "update", "delete"]: + name = f"on_{event}" + callbacks[name] = [AsyncMock(name=f"resource:{name}"), AsyncMock(name=f"global:{name}")] + + getattr(signals.data, name).connect(callbacks[name][0]) + getattr(global_signals.data, name).connect(callbacks[name][1]) + + name = f"on_{event}d" + callbacks[name] = [AsyncMock(name=f"resource:{name}"), AsyncMock(name=f"global:{name}")] + getattr(signals.data, name).connect(callbacks[name][0]) + getattr(global_signals.data, name).connect(callbacks[name][1]) + + test_user = john_doe() + + def assert_mocks_called(action: str, *test_args): + callbacks[f"on_{action}"][0].assert_called_once_with(*test_args) + callbacks[f"on_{action}"][1].assert_called_once_with(*test_args) + callbacks[f"on_{action}d"][0].assert_called_once_with(*test_args) + callbacks[f"on_{action}d"][1].assert_called_once_with(*test_args) + + for callback_name, callback in callbacks.items(): + if action not in callback_name: + callback[0].assert_not_called() + callback[1].assert_not_called() + + call_count = 0 + for callback in callbacks.values(): + call_count += callback[0].call_count + call_count += callback[1].call_count + callback[0].reset_mock() + callback[1].reset_mock() + + assert call_count == 4 + + # Test create signals + await self.service.create([test_user]) + assert_mocks_called("create", test_user) + + # Test update signals + updates = dict(first_name="Foo", last_name="Bar") + await self.service.update(test_user.id, updates) + assert_mocks_called("update", test_user, updates) + + # Test delete signals + await self.service.delete(test_user) + assert_mocks_called("delete", test_user) + + async def test_modifying_data_from_data_signal(self): + test_user = john_doe() + + def modify_on_create(user: User) -> None: + # Test modifying the item before it's inserted into the DB + user.code = "test_created" + + def modify_on_update(original: User, updates: dict[str, Any]) -> None: + # Test modifying the item before it's updated in the DB + updates["code"] = "test_updated" + + def raise_error_on_delete(user: User) -> None: + # Test raising an error under certain conditions + if user.code == "test_created": + raise AttributeError("Invalid user state") + + signals = User.get_signals() + signals.data.on_create.connect(modify_on_create) + signals.data.on_update.connect(modify_on_update) + signals.data.on_delete.connect(raise_error_on_delete) + + # Test modify in create signal + await self.service.create([test_user]) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_created") + + # Test exception is raised from connected signal, due to value of code field + with self.assertRaises(AttributeError) as e: + await self.service.delete(user_in_db) + self.assertEqual(str(e.exception), "Invalid user state") + + # Test modify in update signal + await self.service.update(test_user.id, dict(first_name="Foo", last_name="Bar")) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_updated") + + # Test exception IS NOT raised from connected signal, due to value of code field + await self.service.delete(user_in_db) + self.assertIsNone(await self.service.find_by_id(test_user.id)) + + # Disconnect resource specific signals, and re-connect to global resource signals + signals.data.on_create.disconnect(modify_on_create) + signals.data.on_update.disconnect(modify_on_update) + signals.data.on_delete.disconnect(raise_error_on_delete) + + global_signals.data.on_create.connect(modify_on_create) + global_signals.data.on_update.connect(modify_on_update) + global_signals.data.on_delete.connect(raise_error_on_delete) + + # Test modify in create signal + await self.service.create([test_user]) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_created") + + # Test exception is raised from connected signal, due to value of code field + with self.assertRaises(AttributeError) as e: + await self.service.delete(user_in_db) + self.assertEqual(str(e.exception), "Invalid user state") + + # Test modify in update signal + await self.service.update(test_user.id, dict(first_name="Foo", last_name="Bar")) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_updated") + + # Test exception IS NOT raised from connected signal, due to value of code field + await self.service.delete(user_in_db) + self.assertIsNone(await self.service.find_by_id(test_user.id)) + + +class ResourceWebSignalsTestCase(AsyncFlaskTestCase): + app_config = {"MODULES": ["tests.core.modules.users"]} + service: AsyncResourceService[User] + + async def asyncSetUp(self): + await super().asyncSetUp() + self.service = User.get_service() + + async def asyncTearDown(self): + await super().asyncTearDown() + clear_all_resource_signal_listeners() + # global_signals.clear_listeners() + # User.get_signals().clear_listeners() + + @mock.patch("superdesk.core.resources.service.utcnow", return_value=NOW) + async def test_web_signals(self, mock_utcnow): + signals = User.get_signals() + + callbacks: dict[str, list[AsyncMock]] = {} + for event in ["create", "update", "delete", "get", "search"]: + name = f"on_{event}" + callbacks[name] = [AsyncMock(), AsyncMock()] + getattr(signals.web, name).connect(callbacks[name][0]) + getattr(global_signals.web, name).connect(callbacks[name][1]) + + name = f"on_{event}_response" + callbacks[name] = [AsyncMock(), AsyncMock()] + getattr(signals.web, name).connect(callbacks[name][0]) + getattr(global_signals.web, name).connect(callbacks[name][1]) + + def assert_mocks_called(action: str, pre_args: list[Any], post_args: list[Any]): + callbacks[f"on_{action}"][0].assert_called_once_with(*pre_args) + callbacks[f"on_{action}"][1].assert_called_once_with(*pre_args) + callbacks[f"on_{action}_response"][0].assert_called_once_with(*post_args) + callbacks[f"on_{action}_response"][1].assert_called_once_with(*post_args) + + # Assert the first argument is a request instance + self.assertIsInstance(callbacks[f"on_{action}"][0].call_args[0][0], HttpFlaskRequest) + self.assertIsInstance(callbacks[f"on_{action}"][1].call_args[0][0], HttpFlaskRequest) + self.assertIsInstance(callbacks[f"on_{action}_response"][0].call_args[0][0], HttpFlaskRequest) + self.assertIsInstance(callbacks[f"on_{action}_response"][1].call_args[0][0], HttpFlaskRequest) + + for callback_name, callback in callbacks.items(): + if action not in callback_name: + callback[0].assert_not_called() + callback[1].assert_not_called() + + call_count = 0 + for callback in callbacks.values(): + call_count += callback[0].call_count + call_count += callback[1].call_count + callback[0].reset_mock() + callback[1].reset_mock() + + assert call_count == 4 + + test_user = john_doe() + + # Test create signals + response = await self.test_client.post("/api/users_async", json=test_user) + self.assertEqual(response.status_code, 201) + response_data = await response.get_json() + test_user.etag = response_data["_etag"] + test_user.created = NOW + test_user.updated = NOW + assert_mocks_called( + "create", + [ANY, [test_user]], + [ + ANY, + Response( + { + **test_user.to_dict(), + "_links": {"self": {"title": "User", "href": "users_async/user_1"}}, + }, + 201, + (), + ), + ], + ) + + # Test get signals + test_user = await self.service.find_by_id(test_user.id) + response = await self.test_client.get(f"/api/users_async/{test_user.id}") + self.assertEqual(response.status_code, 200) + assert_mocks_called( + "get", + [ANY], + [ + ANY, + Response( + { + **test_user.to_dict(), + "_created": format_time(NOW) + "+00:00", + "_updated": format_time(NOW) + "+00:00", + }, + 200, + (), + ), + ], + ) + + # Test search signals + response = await self.test_client.get("""/api/users_async?source={"query":{"match":{"first_name":"John"}}}""") + self.assertEqual(response.status_code, 200) + response_data = await response.get_json() + from pprint import pprint + + pprint(response_data) + assert_mocks_called( + "search", + [ + ANY, + SearchRequest( + args={"source": '{"query":{"match":{"first_name":"John"}}}'}, + source='{"query":{"match":{"first_name":"John"}}}', + ), + ], + [ + ANY, + Response( + { + "_items": [ + { + **test_user.to_dict(), + "_created": format_time(NOW) + "+00:00", + "_updated": format_time(NOW) + "+00:00", + } + ], + "_meta": { + "page": 1, + "max_results": 25, + "total": 1, + }, + "_links": ANY, + }, + 200, + [("X-Total-Count", 1)], + ), + ], + ) + + # Test update signals + test_user = await self.service.find_by_id(test_user.id) + response = await self.test_client.patch( + f"/api/users_async/{test_user.id}", + json={ + "first_name": "Foo", + "last_name": "Bar", + }, + headers={"If-Match": test_user.etag}, + ) + self.assertEqual(response.status_code, 200) + response_data = await response.get_json() + assert_mocks_called( + "update", + [ANY, test_user, {"first_name": "Foo", "last_name": "Bar"}], + [ + ANY, + Response( + { + "_id": test_user.id, + "_updated": NOW, + "_etag": response_data["_etag"], + "first_name": "Foo", + "last_name": "Bar", + "_status": "OK", + "_links": {"self": {"title": "User", "href": "users_async/user_1"}}, + }, + 200, + (), + ), + ], + ) + + # Test delete signals + test_user = await self.service.find_by_id(test_user.id) + response = await self.test_client.delete( + f"/api/users_async/{test_user.id}", headers={"If-Match": test_user.etag} + ) + self.assertEqual(response.status_code, 204) + assert_mocks_called("delete", [ANY, test_user], [ANY, Response({}, 204, ())]) + + async def test_modifying_data_from_web_signal(self): + test_user = john_doe() + + def modify_on_create(request: Request, users: list[User]) -> None: + # Test modifying the item before it's inserted into the DB + users[0].code = "test_created" + + def modify_on_update(request: Request, original: User, updates: dict[str, Any]) -> None: + # Test modifying the item before it's updated in the DB + updates["code"] = "test_updated" + + def raise_error_on_delete(request: Request, user: User) -> None: + # Test raising an error under certain conditions + if user.code == "test_created": + raise SuperdeskApiError.badRequestError("Invalid user state") + + signals = User.get_signals() + signals.web.on_create.connect(modify_on_create) + signals.web.on_update.connect(modify_on_update) + signals.web.on_delete.connect(raise_error_on_delete) + + # Test modify in create signal + response = await self.test_client.post("/api/users_async", json=test_user) + self.assertEqual(response.status_code, 201) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_created") + + # Test exception is raised from connected signal, due to value of code field + response = await self.test_client.delete( + f"/api/users_async/{test_user.id}", headers={"If-Match": user_in_db.etag} + ) + self.assertEqual(response.status_code, 400) + self.assertTrue("Invalid user state" in await response.get_data(as_text=True)) + + # Test modify in update signal + response = await self.test_client.patch( + f"/api/users_async/{user_in_db.id}", + json={ + "first_name": "Foo", + "last_name": "Bar", + }, + headers={"If-Match": user_in_db.etag}, + ) + self.assertEqual(response.status_code, 200) + user_in_db = await self.service.find_by_id(test_user.id) + self.assertEqual(user_in_db.code, "test_updated") + + # Test exception IS NOT raised from connected signal, due to value of code field + response = await self.test_client.delete( + f"/api/users_async/{test_user.id}", headers={"If-Match": user_in_db.etag} + ) + self.assertEqual(response.status_code, 204) + self.assertIsNone(await self.service.find_by_id(test_user.id)) diff --git a/tests/storage/modules/assests/resources.py b/tests/storage/modules/assests/resources.py index 733f0995b4..67e5c7df43 100644 --- a/tests/storage/modules/assests/resources.py +++ b/tests/storage/modules/assests/resources.py @@ -1,6 +1,5 @@ -from superdesk.core.elastic.common import ElasticResourceConfig from superdesk.core.module import Module -from superdesk.core.resources import ResourceModel, ResourceConfig +from superdesk.core.resources import ResourceModel, ResourceConfig, ElasticResourceConfig class Upload(ResourceModel): From 109a9318a1899aa1db12cd5cba2e940240d7e507 Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:12:33 +1100 Subject: [PATCH 10/11] add/update docs --- docs/core/elastic.rst | 2 +- docs/core/mongo.rst | 6 +- docs/core/resources.rst | 8 +- docs/core/signals.rst | 317 ++++++++++++++++++++++++++++++++++++++++ docs/core/web.rst | 20 +-- 5 files changed, 335 insertions(+), 18 deletions(-) diff --git a/docs/core/elastic.rst b/docs/core/elastic.rst index d19daa84bc..2789257c22 100644 --- a/docs/core/elastic.rst +++ b/docs/core/elastic.rst @@ -38,7 +38,7 @@ The :attr:`get_client_async ` method retu Elastic References ------------------ -.. autoclass:: superdesk.core.elastic.common.ElasticResourceConfig +.. autoclass:: superdesk.core.types.ElasticResourceConfig :member-order: bysource :members: diff --git a/docs/core/mongo.rst b/docs/core/mongo.rst index fe1d973c7f..82aa1888f6 100644 --- a/docs/core/mongo.rst +++ b/docs/core/mongo.rst @@ -35,15 +35,15 @@ The :attr:`get_db_async ` method returns an instanc Mongo References ---------------- -.. autoclass:: superdesk.core.mongo.MongoResourceConfig +.. autoclass:: superdesk.core.types.MongoResourceConfig :member-order: bysource :members: -.. autoclass:: superdesk.core.mongo.MongoIndexOptions +.. autoclass:: superdesk.core.types.MongoIndexOptions :member-order: bysource :members: -.. autoclass:: superdesk.core.mongo.MongoIndexCollation +.. autoclass:: superdesk.core.types.MongoIndexCollation :member-order: bysource :members: diff --git a/docs/core/resources.rst b/docs/core/resources.rst index 85b2cda82b..fc66716ddb 100644 --- a/docs/core/resources.rst +++ b/docs/core/resources.rst @@ -542,8 +542,8 @@ The :meth:`Resources.register ` method provides a way using the :class:`ResourceConfig ` class to provide the resource config. This will register the resource with MongoDB and optionally the Elasticsearch system. See -:class:`MongoResourceConfig ` and -:class:`ElasticResourceConfig ` for MongoDB and Elastic config options. +:class:`MongoResourceConfig ` and +:class:`ElasticResourceConfig ` for MongoDB and Elastic config options. Example module:: @@ -599,7 +599,7 @@ You can also use the ``resources`` config from a Module to automatically registe API References -------------- -.. autoclass:: superdesk.core.resources.model.Resources +.. autoclass:: superdesk.core.resources.resource_manager.Resources :member-order: bysource :members: @@ -614,7 +614,7 @@ Resource Model :member-order: bysource :members: id -.. autoclass:: superdesk.core.resources.model.ResourceConfig +.. autoclass:: superdesk.core.resources.resource_config.ResourceConfig :member-order: bysource :members: diff --git a/docs/core/signals.rst b/docs/core/signals.rst index daf89affa5..09b01e5c32 100644 --- a/docs/core/signals.rst +++ b/docs/core/signals.rst @@ -1,10 +1,327 @@ .. core_signals: +====================== Signals / Event System ====================== .. module:: superdesk.core.signals +The signal mechanism provides a way to subscribe to events that can be raised by the system. + +Signals are strictly typed, + +Types Of Signals +---------------- + +At the core is the :class:`Signal` class, which provides the base functionality for all signal types. +There are also :class:`AsyncSignal` and :class:`SignalGroup` types, each with their own use cases. + +Example definition and subscription of signals:: + + from superdesk.core.signals import ( + Signal, + AsyncSignal, + SignalGroup + ) + + # Signal defined on a module level scope + on_start = Signal[]("on_app_start") + + class MyClass(SignalGroup): + # Signals defined on a class instance scope + on_event: Signal[bool] + on_event_async: AsyncSignal[str] + + # Define some callbacks + def event_cb(arg1: bool): + print(f"Value: {arg1}") + + async def event_cb_async(arg1: str): + print(f"Value: {arg1}") + + async def main(): + # Example connecting, disconnecting and sending signals + my_inst = MyClass() + my_inst.on_event.connect(event_cb) + my_inst.on_event_async.connect(event_cb_async) + on_start.send() + my_inst.on_event.send(True) + await my_inst.on_event_async.send("Some Data") + +Signal +****** + +Example signals:: + + # Creating a module level signal + on_create: Signal[dict] + + # Creating a class instance scoped signal + class MyContext(SignalGroup): + on_exit: Signal[bool] + +There are two functions for subscribing to a signal: + +* :meth:`connect() ` - connect a function to this signal +* :meth:`disconnect() ` - disconnect a function from this signal + +When you :meth:`send() ` a signal, it runs all it's connected functions with the provided args. + +Example:: + + def on_create_cb(arg1: dict): + pass + + def on_exit_cb(arg1: bool): + pass + + def main(): + my_inst = MyContext() + my_inst.on_exit.connect(on_exit_cb) + + on_create.connect(on_create_cb) + on_create.send({"some": "data"}) + on_create.disconnect(on_create_cb) + + my_inst.on_exit.send(True) + +.. autoclass:: superdesk.core.signals.Signal + :member-order: bysource + :members: + + + +Async Signals +************* + +Example signals:: + + from superdesk.core.signals import AsyncSignal, SignalGroup + + # Creating a module level signal + on_create: AsyncSignal[dict] + + # Creating a class instance scoped signal + class MyContext(SignalGroup): + on_exit: AsyncSignal[bool] + .. autoclass:: superdesk.core.signals.AsyncSignal :member-order: bysource :members: + +Signal Groups +************* + +As can be seen from the examples above, a SignalGroup is one way to easily scope signals to a class instance. +Any attribute on the class that starts with `on_` and has no value after ``__init__``, then a Signal, AsyncSignal or SignalGroup will be constructed for you. +This allows us to define the signals on the class without constructing them but by annotating them. + +AsyncGroups can also be connected to another AsyncGroup:: + + from superdesk.core.signals import Signal, SignalGroup + + class GroupA(SignalGroup): + on_a1: Signal[bool] + on_a2: Signal[str] + + class GroupB(SignalGroup): + on_a: SignalGroup[GroupA]) + + +.. autoclass:: superdesk.core.signals.SignalGroup + :member-order: bysource + :members: + + + +Resource Signals +---------------- + +Resource signals are split up based on context, data or web. + +Below is a short diagram showing when the resource signals are fired. +This example is when the API receives a request to create a new resource. + +.. uml:: + + title Create Resource Data Flow + + cloud "HTTP Request" as client + cloud "HTTP Response" as client_response + + rectangle "Web: Process request" as web_process_request + rectangle "Data: Process request" as data_process_request + rectangle "Data: Process response" as data_process_response + rectangle "Web: Process response" as web_process_response + + queue "web.on_create" as signal_web_create + queue "web.on_created" as signal_web_created + queue "data.on_create" as signal_data_create + queue "data.on_created" as signal_data_created + + database "Mongo/Elastic" as db + + client --> web_process_request: Send create request + web_process_request -> signal_web_create: Send signal + web_process_request --> data_process_request: Send to data layer + data_process_request -> signal_data_create: Send signal + data_process_request --> db: Save data + db --> data_process_response: New item + data_process_response -> signal_data_created: Send signal + data_process_response --> web_process_response: Send new item + web_process_response -> signal_web_created: Send signal + web_process_response --> client_response: Send response + +.. autoclass:: superdesk.core.resources.resource_signals.ResourceSignals + :member-order: + :members: + + +Data Signals +************ + +The ResourceDataSignals class is used for signals around sending data to the DB. +Signals are sent before and after the following DB actions: + +Example:: + + from tests.core.modules.users import User + + def on_user_create(user: User) -> None: + user.code = "test_user_code" + + def init_app(): + User.get_signals().data.on_create.connect(on_user_create) + +Below are the available resource data signals, along with their types. + +.. autoclass:: superdesk.core.resources.resource_signals.ResourceDataSignals + :member-order: bysource + :members: + :exclude-members: signal_name_prefix + +Web Signals +*********** + +The ResourceWebSignals class is used for signals around receiving API requests regarding resources. + +Signals are sent before and after the following request actions: + +Example:: + + from superdesk.core.types import Request, Response + from tests.core.modules.users import User + + def on_user_create(request: Request, users: list[User]) -> None: + # Use the request object to apply some logic + if not request.get_url_arg("add_code"): + return + + # Update the resource data before it's saved in the DB + for user in users: + user.code = "test_user_code" + + def on_user_create_response( + request: Request, + response: Response + ) -> None: + # Add some headers to the response + response.headers += (("X-Custom-Attribute", "SomeValidData")) + + def init_app(): + # Connect our resource listeners to the resource web signals + signals = User.get_signals().web + signals.on_create.connect(on_user_create) + signals.on_create_response.connect(on_user_create_response) + +Below are the available resource web signals, along with their types. + +.. autoclass:: superdesk.core.resources.resource_signals.ResourceWebSignals + :member-order: bysource + :members: + :exclude-members: signal_name_prefix + +Global Resource Signals +----------------------- + +Global resource signals exist so if you can hook code into signal(s) from all resources. +It is an instance of :class:`ResourceSignals `. + +You use it by importing the module level signal from superdesk.core.resources.global_signals.:: + + from superdesk.core.resources import ( + ResourceModel, + global_signals, + ) + + async def on_resource_created(doc: ResourceModel) -> None: + print(f"{doc.type} - {doc.id}: created") + + global_signals.data.on_create.connect(on_resource_created) + + +Global Functions +---------------- + +.. autofunction:: superdesk.core.resources.resource_signals.get_resource_signals + +.. autofunction:: superdesk.core.resources.resource_signals.clear_all_resource_signal_listeners + +Example Signals +--------------- + +On Resource Registered +********************** + +Example signal definition and subscription + +Originally when registering a resource, the Resources module will be in charge of registering that resource with Mongo & Elastic. +This was hard coded:: + + class Resources: + ... + + def register(self, config: ResourceConfig): + ... + mongo_config = config.mongo or MongoResourceConfig() + if config.versioning: + mongo_config.versioning = True + + self.app.mongo.register_resource_config( + config.name, mongo_config + ) + ... + +With the help of SignalGroups, modules no longer need to add their code to the Resources module, +but instead connect to the :meth:`on_resource_registered ` signal and add the registration code there. :: + + from superdesk.core.signals import Signal, SignalGroup + + class Resources(SignalGroup): + on_resource_registered: Signal[ + SuperdeskAsyncApp, + ResourceConfig + ] + + def register(self, config: ResourceConfig): + ... + self.on_resource_registered.send(self.app, config) + ... + +And then in a mongo module:: + + from superdesk.core.types import MongoResourceConfig + from superdesk.core.app import SuperdeskAsyncApp + from superdesk.core.resources import ResourceConfig + + def on_resource_registered( + app: SuperdeskAsyncApp, + config: ResourceConfig + ) -> None: + mongo_config = config.mongo or MongoResourceConfig() + if config.versioning: + mongo_config.versioning = True + + app.mongo.register_resource_config(config.name, mongo_config) + +This way the Resources class does not need to know how to register resources for other modules. diff --git a/docs/core/web.rst b/docs/core/web.rst index e4fdff9ebf..41e18e7722 100644 --- a/docs/core/web.rst +++ b/docs/core/web.rst @@ -108,7 +108,7 @@ Resource REST Endpoints ----------------------- REST endpoints can be enabled for a resource by defining the -:attr:`rest_endpoints ` attribute on the ResourceConfig. +:attr:`rest_endpoints ` attribute on the ResourceConfig. See :class:`RestEndpointConfig ` for config options. @@ -326,39 +326,39 @@ For example:: API References -------------- -.. autoclass:: superdesk.core.web.types.Response +.. autoclass:: superdesk.core.types.Response :member-order: bysource :members: :undoc-members: -.. autodata:: superdesk.core.web.types.EndpointFunction +.. autodata:: superdesk.core.types.EndpointFunction -.. autoclass:: superdesk.core.web.types.Endpoint +.. autoclass:: superdesk.core.types.Endpoint :member-order: bysource :members: :undoc-members: -.. autoclass:: superdesk.core.web.types.Request +.. autoclass:: superdesk.core.types.Request :member-order: bysource :members: :undoc-members: -.. autoclass:: superdesk.core.web.types.EndpointGroup +.. autoclass:: superdesk.core.types.EndpointGroup :member-order: bysource :members: :undoc-members: -.. autoclass:: superdesk.core.web.types.RestResponseMeta +.. autoclass:: superdesk.core.types.RestResponseMeta :member-order: bysource :members: :undoc-members: -.. autoclass:: superdesk.core.web.types.RestGetResponse +.. autoclass:: superdesk.core.types.RestGetResponse :member-order: bysource :members: :undoc-members: -.. autofunction:: superdesk.core.web.types.endpoint +.. autofunction:: superdesk.core.web.endpoint .. autoclass:: superdesk.core.web.rest_endpoints.RestEndpoints :member-order: bysource @@ -381,7 +381,7 @@ API References :members: :undoc-members: -.. autoclass:: superdesk.core.web.types.WSGIApp +.. autoclass:: superdesk.core.types.WSGIApp :member-order: bysource :members: :undoc-members: From 1469bd7a18a49fd0d7012a6667d9e1e3086eff4d Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Wed, 15 Jan 2025 16:12:38 +1100 Subject: [PATCH 11/11] fix: add deleted file to commits --- superdesk/core/elastic/common.py | 85 -------------------------------- 1 file changed, 85 deletions(-) delete mode 100644 superdesk/core/elastic/common.py diff --git a/superdesk/core/elastic/common.py b/superdesk/core/elastic/common.py deleted file mode 100644 index 83c9a5a103..0000000000 --- a/superdesk/core/elastic/common.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8; -*- -# -# This file is part of Superdesk. -# -# Copyright 2024 Sourcefabric z.u. and contributors. -# -# For the full copyright and license information, please see the -# AUTHORS and LICENSE files distributed with this source code, or -# at https://www.sourcefabric.org/superdesk/license - -from typing import Dict, Any, Optional, Callable -from dataclasses import dataclass - -from uuid import uuid4 - -from ..config import ConfigModel -from superdesk.core.types import SearchRequest, SortParam - - -@dataclass -class ElasticResourceConfig: - """Resource config for use with Elasticsearch, to be included with the ResourceConfig""" - - #: Config prefix to be used - prefix: str = "ELASTICSEARCH" - - #: The default sort - default_sort: SortParam | None = None - - #: The default maximum number of documents to be returned - default_max_results: int = 25 - - #: An optional filter to be applied to all searches - filter: Optional[Dict[str, Any]] = None - - #: An optional callback used to construct a filter dynamically, to be applied to all searches - filter_callback: Optional[Callable[[Optional[SearchRequest]], Dict[str, Any]]] = None - - #: An optional dictionary of field aggregations - aggregations: Optional[Dict[str, Any]] = None - - #: An optional dictionary of highlights to be applied - highlight: Optional[Callable[[str], Optional[Dict[str, Any]]]] = None - - #: An optional list of facets to be applied (Will this be required in new version?) - facets: Optional[Dict[str, Any]] = None - - -class ElasticClientConfig(ConfigModel): - """Dataclass for storing an Elastic config for a specific resource""" - - #: The index prefix to use for the resource - index: str = "superdesk" - - #: The URL of the Elasticsearch instance to connect to - url: str = "http://localhost:9200" - - #: Refresh the Elasticsearch index after uploading documents to the index - force_refresh: bool = True - - #: If ``True``, automatically requests aggregations on search. - auto_aggregations: bool = True - - #: Set the default ``track_total_hits`` for search requests. See https://www.elastic.co/guide/en/elasticsearch/reference/master/search-your-data.html#track-total-hits - track_total_hits: int = 10000 - - #: Number of retries when timing out - retry_on_timeout: bool = True - - #: Maximum number of retries - max_retries: int = 3 - - #: Number of retries on update if there is a conflict - retry_on_conflict: int = 5 - - #: Optional dict to use when connecting to an Elasticsearch instance - options: Optional[Dict[str, Any]] = None - - #: Settings to be placed on the Elasticsearch index when creating it - settings: Optional[Dict[str, Any]] = None - - -def generate_index_name(alias: str): - random = str(uuid4()).split("-")[0] - return "{}_{}".format(alias, random)