diff --git a/content_api/app/settings.py b/content_api/app/settings.py index 8c192ebd8b..5a996d35ca 100644 --- a/content_api/app/settings.py +++ b/content_api/app/settings.py @@ -52,6 +52,10 @@ "content_api.api_audit", ] +MODULES = [ + "content_api.items.module", +] + CONTENTAPI_DOMAIN = {} # NOTE: no trailing slash for the CONTENTAPI_URL setting! diff --git a/content_api/items/async_service.py b/content_api/items/async_service.py new file mode 100644 index 0000000000..494b7af42d --- /dev/null +++ b/content_api/items/async_service.py @@ -0,0 +1,7 @@ +from superdesk.core.resources import AsyncResourceService + +from .model import ContentAPIItem + + +class ContentAPIItemService(AsyncResourceService[ContentAPIItem]): + pass diff --git a/content_api/items/model.py b/content_api/items/model.py new file mode 100644 index 0000000000..532ad6e975 --- /dev/null +++ b/content_api/items/model.py @@ -0,0 +1,180 @@ +from typing import Annotated, Any +from datetime import datetime +from enum import Enum, unique + +from pydantic import Field, field_validator + +from superdesk.core.resources import ResourceModel, dataclass, fields, validators, ModelWithVersions + + +ContentAssociation = Annotated[ + dict[str, Any], + fields.elastic_mapping( + { + "dynamic": False, + "properties": { + "featuremedia": { + "dynamic": False, + "properties": { + "_id": {"type": "keyword"}, + "guid": {"type": "keyword"}, + "unique_id": {"type": "integer"}, + }, + }, + }, + } + ), +] + + +@dataclass +class CVItem: + qcode: fields.Keyword + name: fields.Keyword + schema: fields.Keyword | None = None + + +@dataclass +class CVItemWithCode: + code: fields.Keyword + name: fields.Keyword + schema: fields.Keyword | None = None + scheme: fields.Keyword | None = None + + +@dataclass +class Place: + scheme: fields.Keyword | None = None + qcode: fields.Keyword | None = None + code: fields.Keyword | None = None + name: fields.Keyword | None = None + locality: fields.Keyword | None = None + state: fields.Keyword | None = None + country: fields.Keyword | None = None + world_region: fields.Keyword | None = None + locality_code: fields.Keyword | None = None + state_code: fields.Keyword | None = None + country_code: fields.Keyword | None = None + world_region_code: fields.Keyword | None = None + feature_class: fields.Keyword | None = None + location: fields.Geopoint | None = None + rel: fields.Keyword | None = None + + +@dataclass +class Annotation: + id: int + type: fields.Keyword + body: fields.Keyword + + +@dataclass +class ContentAuthor: + uri: fields.Keyword | None = None + parent: fields.Keyword | None = None + name: fields.TextWithKeyword | None = None + role: fields.Keyword | None = None + jobtitle: dict | None = None + sub_label: fields.TextWithKeyword | None = None + biography: str | None = None + code: fields.Keyword | None = None + + +@dataclass +class ContentReference: + id: Annotated[fields.Keyword, Field(alias="_id")] + key: fields.Keyword | None = None + uri: fields.Keyword | None = None + guid: fields.Keyword | None = None + type: fields.Keyword | None = None + source: fields.Keyword | None = None + + +@unique +class PubStatusType(str, Enum): + USABLE = "usable" + WITHHELD = "withheld" + CANCELLED = "canceled" + + +@unique +class ContentType(str, Enum): + TEXT = "text" + PREFORMATTED = "preformatted" + AUDIO = "audio" + VIDEO = "video" + PICTURE = "picture" + GRAPHIC = "graphic" + COMPOSITE = "composite" + + +class ContentAPIItem(ResourceModel, ModelWithVersions): + id: Annotated[str, Field(alias="_id")] + associations: ContentAssociation | None = None + anpa_category: list[CVItem] = Field(default_factory=list) + body_html: fields.HTML | None = None + body_text: str | None = None + byline: str | None = None + copyrightnotice: Annotated[str | None, fields.not_indexed()] = None + copyrightholder: str | None = None + description_html: str | None = None + description_text: str | None = None + headline: fields.HTML | None = None + language: fields.Keyword | None = None + located: str | None = None + mimetype: fields.Keyword | None = None + organization: list[dict] = Field(default_factory=list) + person: list[dict] = Field(default_factory=list) + place: list[Place] = Field(default_factory=list) + profile: str | None = None + pubstatus: Annotated[PubStatusType | None, fields.keyword_mapping()] = None + renditions: dict | None = None + service: list[CVItemWithCode] = Field(default_factory=list) + slugline: str | None = None + source: fields.Keyword | None = None + subject: list[CVItemWithCode] = Field(default_factory=list) + keywords: list[fields.HTML] = Field(default_factory=list) + anpa_take_key: str | None = None + + content_type: Annotated[ContentType, fields.keyword_mapping(), Field(alias="type")] = ContentType.TEXT + + urgency: int | None = None + priority: int | None = None + uri: Annotated[fields.Keyword | None, validators.validate_iunique_value_async("items", "uri")] = None + usageterms: str | None = None + version: str | None = None + versioncreated: datetime = Field(default_factory=datetime.now) + firstcreated: datetime = Field(default_factory=datetime.now) + firstpublished: datetime = Field(default_factory=datetime.now) + embargoed: datetime | None = None + evolvedfrom: fields.Keyword | None = None + nextversion: fields.Keyword | None = None + original_id: fields.Keyword | None = None + subscribers: Annotated[list[fields.Keyword], fields.keyword_mapping(), Field(default_factory=list)] + ednote: str | None = None + signal: list[CVItemWithCode] = Field(default_factory=list) + genre: list[CVItemWithCode] = Field(default_factory=list) + ancestors: Annotated[list[fields.Keyword], fields.keyword_mapping(), Field(default_factory=list)] + attachments: list[dict] = Field(default_factory=list) + annotations: list[Annotation] = Field(default_factory=list) + + extra: dict | None = None + extra_items: dict | None = None + authors: list[ContentAuthor] = Field(default_factory=list) + wordcount: int | None = None + charcount: int | None = None + readtime: int | None = None + + # These are for linking to Planning module resources + event_id: fields.Keyword | None = None + planning_id: fields.Keyword | None = None + coverage_id: fields.Keyword | None = None + agenda_id: fields.Keyword | None = None + agenda_href: fields.Keyword | None = None + + refs: list[ContentReference] = Field(default_factory=list) + expiry: datetime = Field(default_factory=datetime.now) + + @field_validator("version", mode="before") + def parse_version(cls, value: int | str | None) -> str | None: + return str(value) if value is not None else None diff --git a/content_api/items/module.py b/content_api/items/module.py new file mode 100644 index 0000000000..4f9571e2be --- /dev/null +++ b/content_api/items/module.py @@ -0,0 +1,44 @@ +from superdesk.core.module import Module +from superdesk.core.resources import ( + ResourceConfig, + MongoResourceConfig, + MongoIndexOptions, + ElasticResourceConfig, +) +from content_api import MONGO_PREFIX, ELASTIC_PREFIX + +from .model import ContentAPIItem +from .async_service import ContentAPIItemService + + +content_api_item_resource_config = ResourceConfig( + name="items", + data_class=ContentAPIItem, + service=ContentAPIItemService, + default_sort=[("versioncreated", -1)], + versioning=True, + mongo=MongoResourceConfig( + prefix=MONGO_PREFIX, + indexes=[ + MongoIndexOptions( + name="_ancestors_", + keys=[("ancestors", 1)], + ), + MongoIndexOptions( + name="expiry_1", + keys=[("expiry", 1)], + ), + ], + ), + elastic=ElasticResourceConfig( + prefix=ELASTIC_PREFIX, + filter={"bool": {"must_not": {"term": {"type": "composite"}}}}, + ), + # TODO-ASYNC: Implement the GET & Search endpoints for this resource +) + + +module = Module( + "content_api.items", + resources=[content_api_item_resource_config], +) diff --git a/content_api/publish/utils.py b/content_api/publish/utils.py new file mode 100644 index 0000000000..a25cee470e --- /dev/null +++ b/content_api/publish/utils.py @@ -0,0 +1,103 @@ +from superdesk import get_resource_service +from superdesk.resource_fields import ID_FIELD, GUID_FIELD + +from content_api.items.model import ContentAPIItem, PubStatusType +from content_api.items.async_service import ContentAPIItemService + + +async def publish_docs_to_content_api(docs: list[dict]) -> list[str]: + ids = [] + for doc in docs: + item_id = doc.pop(GUID_FIELD) + doc[ID_FIELD] = item_id + ids.append(await publish_doc_to_content_api(doc)) + return ids + + +async def publish_doc_to_content_api(item_dict: dict) -> str: + item = ContentAPIItem.from_dict(item_dict) + service = ContentAPIItemService() + + original = await service.find_by_id(item.id) + if original: + item.subscribers = list(set(original.subscribers or []) | set(item.subscribers or [])) + + process_associations(item, original) + create_version_doc(item) + + if original: + await service.update(original.id, item.to_dict(context={"use_objectid": True})) + return original.id + else: + return (await service.create([item]))[0] + + +def process_associations(updates: ContentAPIItem, original: ContentAPIItem | None) -> None: + """Update associations using existing published item and ensure that associated item subscribers + are equal or subset of the parent subscribers. + :param updates: + :param original: + :return: + """ + + subscribers = updates.subscribers or [] + for assoc, update_assoc in (updates.associations or {}).items(): + if not update_assoc: + continue + + if original: + original_assoc = (original.associations or {}).get(assoc) + + if original_assoc: + if original_assoc.get(ID_FIELD) == update_assoc.get(ID_FIELD): + update_assoc["subscribers"] = list( + set(original_assoc.get("subscribers") or []) | set(update_assoc.get("subscribers") or []) + ) + + if original_assoc.get("renditions"): + update_assoc.setdefault("renditions", {}) + for rend in original_assoc["renditions"]: + update_assoc["renditions"].setdefault(rend, None) + + update_assoc["subscribers"] = list(set(update_assoc["subscribers"]) & set(subscribers)) + + # remove associations which were there previously + # but are missing now + if original and original.associations: + if not updates.associations: + updates.associations = {} + for assoc in original.associations: + updates.associations.setdefault(assoc, None) + + # If there are no associations anymore, then set the entire associations field to None + if updates.associations is not None and not any([assoc for assoc in updates.associations.values()]): + updates.associations = None + + +# TODO-ASYNC: Use new versioning system +def create_version_doc(item: ContentAPIItem) -> None: + """ + Store the item in the item version collection + :param item: + :return: + """ + version_item = item.to_dict(context={"use_objectid": True}) + version_item["_id_document"] = version_item.pop("_id") + get_resource_service("items_versions").create([version_item]) + # if the update is a cancel we need to cancel all versions + if item.pubstatus == PubStatusType.CANCELLED: + _cancel_versions(item.id) + + +# TODO-ASYNC: Use new versioning system +def _cancel_versions(doc_id: str) -> None: + """ + Given an id of a document set the pubstatus to canceled for all versions + :param doc_id: + :return: + """ + query = {"_id_document": doc_id} + update = {"pubstatus": "canceled"} + for item in get_resource_service("items_versions").get_from_mongo(req=None, lookup=query): + if item.get("pubstatus") != "canceled": + get_resource_service("items_versions").update(item["_id"], update, item) diff --git a/superdesk/core/elastic/async_client.py b/superdesk/core/elastic/async_client.py index 641f784561..0df9246b91 100644 --- a/superdesk/core/elastic/async_client.py +++ b/superdesk/core/elastic/async_client.py @@ -8,14 +8,18 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Optional, List, Dict, Any, Tuple, cast +from typing import Optional, List, Dict, Any, Tuple, cast, overload +import logging from elasticsearch import AsyncElasticsearch from elasticsearch.exceptions import NotFoundError, TransportError, RequestError from elasticsearch.helpers import async_bulk from superdesk.core.types import SearchRequest -from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString +from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString, ProjectedFieldSources + + +logger = logging.getLogger(__name__) class ElasticResourceAsyncClient(BaseElasticResourceClient): @@ -110,7 +114,7 @@ async def search(self, query: Dict[str, Any], indexes: Optional[List[str]] = Non return await self.elastic.search(**self._get_search_args(query, indexes)) - async def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: + async def find_by_id(self, item_id: str, projection: ProjectedFieldSources | None = None) -> dict[str, Any] | None: """Find a single document in Elasticsearch based on its ID :param item_id: ID of the document to find. @@ -118,7 +122,7 @@ async def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: """ try: - response = await self.elastic.get(index=self.config.index, id=item_id) + response = await self.elastic.get(index=self.config.index, id=item_id, **(projection or {})) if "exists" in response: response["found"] = response["exists"] @@ -145,21 +149,28 @@ async def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: return None return None - async def find_one(self, **lookup) -> Optional[Dict[str, Any]]: + @overload + async def find_one(self, req: SearchRequest) -> dict[str, Any] | None: + ... + + @overload + async def find_one(self, req: dict) -> dict[str, Any] | None: + ... + + async def find_one(self, req: SearchRequest | dict) -> Optional[Dict[str, Any]]: """Find a single document in Elasticsearch based on the provided search query :param lookup: kwargs providing the filters used to search for an item :return: The document found or None if no document was found. """ - if "_id" in lookup: - return await self.find_by_id(lookup["_id"]) + search_request = req if isinstance(req, SearchRequest) else SearchRequest(where=req) - filters = [{"term": {key: val}} for key, val in lookup.items()] - query = {"query": {"bool": {"must": filters}}} + if isinstance(search_request.where, dict) and set(search_request.where.keys()) == {"_id"}: + return await self.find_by_id(search_request.where["_id"], self._get_projected_fields(search_request) or {}) try: - response = await self.elastic.search(index=self.config.index, body=query, size=1) + response = await self.elastic.search(**self._get_find_one_args(search_request)) docs = self._parse_hits(response) return docs.first() @@ -188,13 +199,24 @@ async def find(self, req: SearchRequest, sub_resource_lookup: Optional[Dict[str, :return: A tuple containing an ElasticCursor instance and the number of documents found """ + args = self._get_find_args(req, sub_resource_lookup) + try: - response = await self.elastic.search(**self._get_find_args(req, sub_resource_lookup)) + response = await self.elastic.search(**args) except RequestError as e: - if e.status_code == 400 and "No mapping found for" in e.error: - response = {} - elif e.status_code == 400 and "SearchParseException" in e.error: - raise InvalidSearchString + if e.status_code == 400: + if "No mapping found for" in e.error: + response = {} + elif ( + "SearchParseException" in e.error + or "x_content_parse_exception" in e.error + or "parsing_exception" in e.error + ): + query = args.get("body") + logger.warning(f"Invalid search string: {query}") + raise InvalidSearchString + else: + raise else: raise diff --git a/superdesk/core/elastic/base_client.py b/superdesk/core/elastic/base_client.py index 06968df4cb..f79a6c9ec5 100644 --- a/superdesk/core/elastic/base_client.py +++ b/superdesk/core/elastic/base_client.py @@ -151,8 +151,9 @@ def _get_find_args( self, req: SearchRequest, sub_resource_lookup: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: args = req.args or {} + if args.get("source"): - query: Dict[str, Any] = json.loads(args["source"]) + query: dict[str, Any] = json.loads(args["source"]) if isinstance(args["source"], str) else args["source"] query.setdefault("query", {}) must = [] for key, val in query["query"].items(): @@ -163,6 +164,8 @@ def _get_find_args( else: query = {"query": {"bool": {}}} + req.elastic.generate_query_dict(query) + if args.get("q"): query["query"]["bool"].setdefault("must", []).append( _build_query_string( @@ -178,11 +181,10 @@ def _get_find_args( elif self.resource_config.default_sort: _set_sort(query, self.resource_config.default_sort) - if not req.max_results and self.resource_config.default_max_results: - req.max_results = self.resource_config.default_max_results + if req.max_results is None: + req.max_results = self.resource_config.default_max_results or 25 - if req.max_results: - query.setdefault("size", req.max_results) + query.setdefault("size", req.max_results) if req.page > 1 and req.max_results: query.setdefault("from", (req.page - 1) * req.max_results) @@ -198,7 +200,7 @@ def _get_find_args( filters.append({"bool": {"must": [{"term": {key: val}} for key, val in sub_resource_lookup.items()]}}) if "filter" in args: - filters.append(json.loads(args["filter"])) + filters.append(json.loads(args["filter"]) if isinstance(args["filter"], str) else args["filter"]) if "filters" in args: filters.extend(args["filters"]) @@ -218,7 +220,9 @@ def _get_find_args( if self.resource_config.facets: query["facets"] = self.resource_config.facets - if self.resource_config.aggregations and (self.config.auto_aggregations or req.aggregations): + if req.elastic.aggs: + query["aggs"] = req.elastic.aggs + elif self.resource_config.aggregations and (self.config.auto_aggregations or req.aggregations): query["aggs"] = self.resource_config.aggregations if self.resource_config.highlight and req.highlight: @@ -237,6 +241,20 @@ def _get_find_args( body=query, ) + def _get_find_one_args(self, req: SearchRequest) -> dict[str, Any]: + filters = [{"term": {key: val}} for key, val in req.where.items()] if isinstance(req.where, dict) else [] + query = { + "query": {"bool": {"must": filters}}, + "size": 1, + } + + return dict( + index=self.config.index, + track_total_hits=self.config.track_total_hits, + **(self._get_projected_fields(req) or {}), + body=query, + ) + def _get_projected_fields(self, req: SearchRequest) -> ProjectedFieldSources | None: projection_include, projection_fields = get_projection_from_request(req) diff --git a/superdesk/core/elastic/common.py b/superdesk/core/elastic/common.py index b37f489e5f..83c9a5a103 100644 --- a/superdesk/core/elastic/common.py +++ b/superdesk/core/elastic/common.py @@ -28,7 +28,7 @@ class ElasticResourceConfig: default_sort: SortParam | None = None #: The default maximum number of documents to be returned - default_max_results: Optional[int] = None + default_max_results: int = 25 #: An optional filter to be applied to all searches filter: Optional[Dict[str, Any]] = None diff --git a/superdesk/core/elastic/mapping.py b/superdesk/core/elastic/mapping.py index 2148dc2a94..d9416c9727 100644 --- a/superdesk/core/elastic/mapping.py +++ b/superdesk/core/elastic/mapping.py @@ -76,6 +76,10 @@ def _get_field_type_from_json_schema( return None elif props.get("nested") or (parent_props is not None and parent_props.get("nested")): mapping["type"] = "nested" + if props.get("include_in_parent") or ( + parent_props is not None and parent_props.get("include_in_parent") + ): + mapping["include_in_parent"] = True return mapping except KeyError: # If ``items`` is not defined, we cannot determine the type diff --git a/superdesk/core/elastic/sync_client.py b/superdesk/core/elastic/sync_client.py index b49cc89311..d9b7e36195 100644 --- a/superdesk/core/elastic/sync_client.py +++ b/superdesk/core/elastic/sync_client.py @@ -8,14 +8,14 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Optional, List, Dict, Any, Tuple, cast +from typing import Optional, List, Dict, Any, Tuple, cast, overload from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError, TransportError, RequestError from elasticsearch.helpers import bulk from superdesk.core.types import SearchRequest -from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString +from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString, ProjectedFieldSources class ElasticResourceClient(BaseElasticResourceClient): @@ -110,7 +110,7 @@ def search(self, query: Dict[str, Any], indexes: Optional[List[str]] = None) -> return self.elastic.search(**self._get_search_args(query, indexes)) - def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: + def find_by_id(self, item_id: str, projection: ProjectedFieldSources | None = None) -> dict[str, Any] | None: """Find a single document in Elasticsearch based on its ID :param item_id: ID of the document to find. @@ -118,7 +118,7 @@ def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: """ try: - response = self.elastic.get(index=self.config.index, id=item_id) + response = self.elastic.get(index=self.config.index, id=item_id, **(projection or {})) if "exists" in response: response["found"] = response["exists"] @@ -144,21 +144,28 @@ def find_by_id(self, item_id: str) -> Optional[Dict[str, Any]]: return None return None - def find_one(self, **lookup) -> Optional[Dict[str, Any]]: + @overload + def find_one(self, req: SearchRequest) -> dict[str, Any] | None: + ... + + @overload + def find_one(self, req: dict) -> dict[str, Any] | None: + ... + + def find_one(self, req: SearchRequest | dict) -> dict[str, Any] | None: """Find a single document in Elasticsearch based on the provided search query :param lookup: kwargs providing the filters used to search for an item :return: The document found or None if no document was found. """ - if "_id" in lookup: - return self.find_by_id(lookup["_id"]) + search_request = req if isinstance(req, SearchRequest) else SearchRequest(where=req) - filters = [{"term": {key: val}} for key, val in lookup.items()] - query = {"query": {"bool": {"must": filters}}} + if isinstance(search_request.where, dict) and set(search_request.where.keys()) == {"_id"}: + return self.find_by_id(search_request.where["_id"], self._get_projected_fields(search_request) or {}) try: - response = self.elastic.search(index=self.config.index, body=query, size=1) + response = self.elastic.search(**self._get_find_one_args(search_request)) docs = self._parse_hits(response) return docs.first() diff --git a/superdesk/core/resources/__init__.py b/superdesk/core/resources/__init__.py index 7e8ff1eee5..9a4568ebcf 100644 --- a/superdesk/core/resources/__init__.py +++ b/superdesk/core/resources/__init__.py @@ -9,7 +9,15 @@ # at https://www.sourcefabric.org/superdesk/license from .utils import get_projection_from_request -from .model import Resources, ResourceModel, ResourceModelWithObjectId, ModelWithVersions, ResourceConfig, dataclass +from .model import ( + BaseModel, + Resources, + ResourceModel, + ResourceModelWithObjectId, + ModelWithVersions, + ResourceConfig, + dataclass, +) from .resource_rest_endpoints import RestEndpointConfig, RestParentLink, get_id_url_type from .service import AsyncResourceService, AsyncCacheableService from ..mongo import MongoResourceConfig, MongoIndexOptions @@ -17,6 +25,7 @@ __all__ = [ "get_projection_from_request", + "BaseModel", "Resources", "ResourceModel", "ResourceModelWithObjectId", diff --git a/superdesk/core/resources/cursor.py b/superdesk/core/resources/cursor.py index dc43fa7c48..2d88694506 100644 --- a/superdesk/core/resources/cursor.py +++ b/superdesk/core/resources/cursor.py @@ -8,7 +8,7 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Dict, Any, Generic, TypeVar, Type, Optional, List +from typing import Any, Generic, TypeVar, Type from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorCursor @@ -28,19 +28,22 @@ def __aiter__(self): async def __anext__(self) -> ResourceModelType: raise NotImplementedError() - async def next_raw(self) -> Optional[Dict[str, Any]]: + async def next(self) -> ResourceModelType | None: raise NotImplementedError() - async def to_list(self) -> List[ResourceModelType]: - items: List[ResourceModelType] = [] + async def next_raw(self) -> dict[str, Any] | None: + raise NotImplementedError() + + async def to_list(self) -> list[ResourceModelType]: + items: list[ResourceModelType] = [] item = await self.next_raw() while item is not None: items.append(self.get_model_instance(item)) item = await self.next_raw() return items - async def to_list_raw(self) -> List[Dict[str, Any]]: - items: List[Dict[str, Any]] = [] + async def to_list_raw(self) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] item = await self.next_raw() while item is not None: item["_type"] = self.data_class.model_resource_name @@ -51,7 +54,7 @@ async def to_list_raw(self) -> List[Dict[str, Any]]: async def count(self): raise NotImplementedError() - def get_model_instance(self, data: Dict[str, Any]): + def get_model_instance(self, data: dict[str, Any]): """Get a model instance from a dictionary of values :param data: Dictionary containing values to get a model instance from @@ -78,25 +81,25 @@ def __init__(self, data_class: Type[ResourceModelType], hits=None): self.hits = hits if hits else self.no_hits async def __anext__(self) -> ResourceModelType: - try: - data = self.hits["hits"]["hits"][self._index] - source = data["_source"] - source["_id"] = data["_id"] - source["_type"] = source.pop("_resource", None) - self._index += 1 - return self.get_model_instance(source) - except (IndexError, KeyError, TypeError): - raise StopAsyncIteration + item = await self.next() + if item is not None: + return item + raise StopAsyncIteration + + async def next(self) -> ResourceModelType | None: + item_dict = await self.next_raw() + return None if item_dict is None else self.get_model_instance(item_dict) - async def next_raw(self) -> Optional[Dict[str, Any]]: + async def next_raw(self) -> dict[str, Any] | None: try: data = self.hits["hits"]["hits"][self._index] source = data["_source"] source["_id"] = data["_id"] - source.pop("_resource", None) + source["_type"] = source.pop("_resource", None) self._index += 1 return source except (IndexError, KeyError, TypeError): + self._index = 0 return None async def count(self): @@ -109,7 +112,7 @@ async def count(self): return int(total["value"]) return 0 - def extra(self, response: Dict[str, Any]): + def extra(self, response: dict[str, Any]): """Add extra info to response""" if "facets" in self.hits: response["_facets"] = self.hits["facets"] @@ -123,20 +126,31 @@ def __init__( data_class: Type[ResourceModelType], collection: AsyncIOMotorCollection, cursor: AsyncIOMotorCursor, - lookup: Dict[str, Any], + lookup: dict[str, Any], ): super().__init__(data_class) self.collection = collection self.cursor = cursor self.lookup = lookup - async def __anext__(self): - return self.get_model_instance(await self.cursor.next()) + async def __anext__(self) -> ResourceModelType: + item = await self.next() + if item is not None: + return item + raise StopAsyncIteration + + async def next(self) -> ResourceModelType | None: + try: + return self.get_model_instance(dict(await self.cursor.next())) + except StopAsyncIteration: + self.cursor.rewind() + return None - async def next_raw(self) -> Optional[Dict[str, Any]]: + async def next_raw(self) -> dict[str, Any] | None: try: return dict(await self.cursor.next()) except StopAsyncIteration: + self.cursor.rewind() return None async def count(self): diff --git a/superdesk/core/resources/fields.py b/superdesk/core/resources/fields.py index 34fd1ea7b5..df4b822ace 100644 --- a/superdesk/core/resources/fields.py +++ b/superdesk/core/resources/fields.py @@ -9,6 +9,7 @@ # at https://www.sourcefabric.org/superdesk/license from typing import TYPE_CHECKING, Annotated + from typing_extensions import TypeVar, Generic, ClassVar, Dict, Any, cast, Type, Callable from datetime import datetime @@ -128,11 +129,16 @@ class Keyword(CustomStringField, str): elastic_mapping = {"type": "keyword"} class TextWithKeyword(CustomStringField): - """Elasticsearch text field with a keyword sub-field""" + """Elasticsearch text field with a keyword sub-field + + Additionally, adds ``html_field_analyzer`` analyzer, as to keep with the same config + defined in original superdesk.resource.text_with_keyword mapping + """ elastic_mapping = { "type": "text", "fields": {"keyword": {"type": "keyword"}}, + "analyzer": "html_field_analyzer", } class HTML(str, CustomStringField): @@ -157,31 +163,43 @@ def serialise_value(cls, value: Any, info: core_schema.FieldSerializationInfo): return str(value) if not (info.context or {}).get("use_objectid") else BsonObjectId(value) +def elastic_mapping(mapping: dict[str, Any]) -> WithJsonSchema: + return Field(json_schema_extra={"elastic_mapping": mapping}) + + def keyword_mapping() -> WithJsonSchema: return Field(json_schema_extra={"elastic_mapping": {"type": "keyword"}}) -def nested_list() -> WithJsonSchema: +def mapping_disabled(data_type: str) -> WithJsonSchema: + return Field(json_schema_extra={"elastic_mapping": {"type": data_type, "enabled": False}}) + + +def nested_list(include_in_parent: bool = False) -> WithJsonSchema: """Field modifier, to enabled nested in Elasticsearch for the field Example usage:: - from typing import List - from typing_extensions import Annotated, TypedDict + from typing import Annotated + from typing_extensions import TypedDict from superdesk.core.resources import ResourceModel, fields, dataclass @dataclass class Subjects: qcode: str name: str - scheme: Optional[str] = None + scheme: str | None = None class Content(ResourceModel): ... - subjects: Annotated[List[Subjects], fields.nested_list()] + subjects: Annotated[list[Subjects], fields.nested_list()] """ - return Field(json_schema_extra={"nested": True}) + return Field(json_schema_extra={"nested": True, "include_in_parent": include_in_parent}) + + +def not_indexed() -> WithJsonSchema: + return Field(json_schema_extra={"elastic_mapping": {"type": "text", "index": False}}) @dataclass(config=dict(validate_assignment=True)) diff --git a/superdesk/core/resources/model.py b/superdesk/core/resources/model.py index 9c8ba5b6df..ca56cb8ca1 100644 --- a/superdesk/core/resources/model.py +++ b/superdesk/core/resources/model.py @@ -19,17 +19,17 @@ ClassVar, cast, ) -from typing_extensions import dataclass_transform, override, Self +from typing_extensions import dataclass_transform from dataclasses import dataclass as python_dataclass, field as dataclass_field from copy import deepcopy from inspect import get_annotations from datetime import datetime -from pydantic import BaseModel, ConfigDict, Field, computed_field, ValidationError +from pydantic import ConfigDict, Field, computed_field, ValidationError from pydantic_core import InitErrorDetails, PydanticCustomError from pydantic.dataclasses import dataclass as pydataclass -from superdesk.core.types import SortListParam, ProjectedFieldArg +from superdesk.core.types import SortListParam, ProjectedFieldArg, BaseModel from superdesk.core.utils import generate_guid, GUID_NEWSML from .fields import ObjectId @@ -80,10 +80,6 @@ def type(self) -> str: async def validate_async(self): await _run_async_validators_from_model_class(self, self) - @classmethod - def get_field_names(cls) -> list[str]: - return [info.alias or field for field, info in cls.model_fields.items()] - @classmethod def uses_objectid_for_id(cls) -> bool: try: @@ -91,63 +87,6 @@ def uses_objectid_for_id(cls) -> bool: except KeyError: return False - @override - @classmethod - def model_validate( - cls, - obj: dict[str, Any], - *, - strict: bool | None = None, - from_attributes: bool | None = None, - context: dict[str, Any] | None = None, - include_unknown: bool = False, - ) -> Self: - """Construct a model instance from the provided dictionary, and validate its values - - :param obj: Dictionary of values used to construct the model instance - :param strict: Whether to enforce types strictly - :param from_attributes: Whether to extract data from object attributes - :param context: Additional context to pass to the validator - :param include_unknown: Whether to include fields not defined in the ResourceModel - :raises Pydantic.ValidationError: If validation fails - :rtype: ResourceModel - :returns: The validated model instance - """ - - if not include_unknown: - data = {field: value for field, value in obj.items() if field in cls.get_field_names()} - else: - data = obj.copy() - data.pop("_type", None) - - instance = super().model_validate( - data, - strict=strict, - from_attributes=from_attributes, - context=context, - ) - - return instance - - @classmethod - def from_dict( - cls, - values: dict[str, Any], - context: dict[str, Any] | None = None, - include_unknown: bool = False, - ) -> Self: - """Construct a model instance from the provided dictionary, and validate its values - - :param values: Dictionary of values used to construct the model instance - :param context: Additional context to pass to the validator - :param include_unknown: Whether to include fields not defined in the ResourceModel - :raises Pydantic.ValidationError: If validation fails - :rtype: ResourceModel - :returns: The validated model instance - """ - - return cls.model_validate(values, context=context, include_unknown=include_unknown) - def to_dict(self, **kwargs) -> dict[str, Any]: """ Convert the model instance to a dictionary representation with non-JSON-serializable Python objects. @@ -157,30 +96,13 @@ def to_dict(self, **kwargs) -> dict[str, Any]: :returns: A dictionary representation of the model instance with field aliases. Only fields that are set (non-default) will be included. """ - default_params: dict[str, Any] = {"by_alias": True, "exclude_unset": True} - default_params.update(kwargs) - model_dict = self.model_dump(**default_params) - + model_dict = super().to_dict(**kwargs) if not model_dict.get("_id"): # Make sure to include `id`, in case a default one was provided # as exclude_unset will not include it when serialising model_dict["_id"] = self.id - return model_dict - def to_json(self, **kwargs) -> str: - """ - Convert the model instance to a JSON serializable dictionary. - - :param kwargs: Optional keyword arguments to override the default parameters of model_dump_json. - :rtype: str - :return: A JSON-compatible dictionary representation of the model instance with field aliases. - Only fields that are set (non-default) will be included. - """ - default_params: dict[str, Any] = {"by_alias": True, "exclude_unset": True} - default_params.update(kwargs) - return self.model_dump_json(**default_params) - async def _run_async_validators_from_model_class( model_instance: Any, root_item: ResourceModel, field_name_stack: Optional[List[str]] = None diff --git a/superdesk/core/resources/resource_rest_endpoints.py b/superdesk/core/resources/resource_rest_endpoints.py index 0fd7a30aad..14a335fc49 100644 --- a/superdesk/core/resources/resource_rest_endpoints.py +++ b/superdesk/core/resources/resource_rest_endpoints.py @@ -188,7 +188,9 @@ async def get_parent_items(self, request: Request) -> dict[str, dict]: raise SuperdeskApiError.badRequestError("Parent resource ID not provided in URL") elif service.id_uses_objectid(): item_id = ObjectId(item_id) - item = await service.find_one_raw(use_mongo=True, version=None, **{parent_link.parent_id_field: item_id}) + + # MyPy complains about this next call, but the args are correct + item = await service.find_one_raw(use_mongo=True, version=None, **{parent_link.parent_id_field: item_id}) # type: ignore[call-overload] if not item: raise SuperdeskApiError.notFoundError( f"Parent resource {parent_link.resource_name} with ID '{item_id}' not found" @@ -390,7 +392,7 @@ async def search_items( _items=await cursor.to_list_raw(), _meta=dict( page=params.page, - max_results=params.max_results, + max_results=params.max_results if params.max_results is not None else 25, total=count, ), ) @@ -398,11 +400,6 @@ async def search_items( status = 200 headers = [("X-Total-Count", count)] response["_links"] = self._build_resource_hateoas(params, count, request) - response["_meta"] = dict( - page=params.page, - max_results=params.max_results, - total=count, - ) if hasattr(cursor, "extra"): getattr(cursor, "extra")(response) diff --git a/superdesk/core/resources/service.py b/superdesk/core/resources/service.py index 81469e2dcb..4f8a81c4fb 100644 --- a/superdesk/core/resources/service.py +++ b/superdesk/core/resources/service.py @@ -118,54 +118,115 @@ def get_model_instance_from_dict(self, data: Dict[str, Any]) -> ResourceModelTyp # because nested models are not being converted to model instances return cast(ResourceModelType, self.config.data_class.from_dict(data)) - async def find_one_raw(self, use_mongo: bool = False, version: int | None = None, **lookup) -> dict | None: - """Find a resource by ID + @overload + async def find_one_raw(self, req: SearchRequest) -> dict | None: + ... - :param use_mongo: If ``True`` will force use mongo, else will attempt elastic first - :param version: Optional version to get - :param lookup: Dictionary of key/value pairs used to find the document - :return: ``None`` if resource not found, otherwise an instance of ``ResourceModel`` for this resource - """ + @overload + async def find_one_raw( + self, + req: None = None, + *, + projection: ProjectedFieldArg | None = None, + use_mongo: bool = False, + version: int | None = None, + **lookup, + ) -> dict | None: + ... + + async def find_one_raw( + self, + req: SearchRequest | None = None, + *, + projection: ProjectedFieldArg | None = None, + use_mongo: bool = False, + version: int | None = None, + **lookup, + ) -> dict | None: + search_request = ( + req + if req is not None + else SearchRequest( + where=lookup, + page=1, + max_results=1, + projection=projection, + use_mongo=use_mongo, + version=version, + ) + ) + + if not search_request.projection and self.config.projection: + search_request.projection = self.config.projection item = None try: - if not use_mongo: - item = await self.elastic.find_one(**lookup) + if not search_request.use_mongo: + item = await self.elastic.find_one(search_request) except KeyError: pass - if use_mongo or item is None: - item = await self.mongo_async.find_one(lookup) + if search_request.use_mongo or item is None: + kwargs = dict( + filter=json.loads(search_request.where or "{}") + if isinstance(search_request.where, str) + else search_request.where or {} + ) + projection_include, projection_fields = get_projection_from_request(search_request) + if projection_fields: + kwargs["projection"] = ( + projection_fields if projection_include else {field: False for field in projection_fields} + ) + mongo = self.mongo_async if not search_request.version else self.mongo_versioned_async + item = await mongo.find_one(**kwargs) if item is None: return None - elif version is not None: - item = await self.get_item_version(item, version) + elif search_request.version is not None: + item = await self.get_item_version(item, search_request.version) return item - async def find_one( - self, use_mongo: bool = False, version: int | None = None, **lookup - ) -> Optional[ResourceModelType]: - """Find a resource by ID + @overload + async def find_one(self, req: SearchRequest) -> ResourceModelType | None: + ... - :param use_mongo: If ``True`` will force use mongo, else will attempt elastic first - :param version: Optional version to get - :param lookup: Dictionary of key/value pairs used to find the document - :return: ``None`` if resource not found, otherwise an instance of ``ResourceModel`` for this resource - """ + @overload + async def find_one( + self, + req: None = None, + projection: ProjectedFieldArg | None = None, + use_mongo: bool = False, + version: int | None = None, + **lookup, + ) -> ResourceModelType | None: + ... - item = await self.find_one_raw(use_mongo=use_mongo, version=version, **lookup) + async def find_one( + self, + req: SearchRequest | None = None, + projection: ProjectedFieldArg | None = None, + use_mongo: bool = False, + version: int | None = None, + **lookup, + ) -> ResourceModelType | None: + if req is None: + item = await self.find_one_raw(projection=projection, use_mongo=use_mongo, version=version, **lookup) + else: + item = await self.find_one_raw(req) return None if not item else self.get_model_instance_from_dict(item) - async def find_by_id(self, item_id: Union[str, ObjectId]) -> Optional[ResourceModelType]: + async def find_by_id( + self, item_id: Union[str, ObjectId], version: int | None = None + ) -> Optional[ResourceModelType]: """Find a resource by ID :param item_id: ID of item to find + :param version: Optional version to get :return: ``None`` if resource not found, otherwise an instance of ``ResourceModel`` for this resource """ - item = await self.find_by_id_raw(item_id) + item = await self.find_by_id_raw(item_id, version) return None if item is None else self.get_model_instance_from_dict(item) async def find_by_id_raw( @@ -300,11 +361,14 @@ async def create(self, _docs: Sequence[ResourceModelType | dict[str, Any]]) -> L pass if self.config.versioning: - await self.mongo_versioned_async.insert_one(self._get_versioned_document(doc_dict)) + await self.insert_versioned_document(doc_dict) await self.on_created(docs) return ids + async def insert_versioned_document(self, doc_dict: dict[str, Any]): + await self.mongo_versioned_async.insert_one(self._get_versioned_document(doc_dict)) + def _get_versioned_document(self, doc_dict: dict[str, Any]) -> dict[str, Any]: versioned_doc = doc_dict.copy() versioned_doc["_id_document"] = versioned_doc.pop("_id", None) @@ -423,6 +487,9 @@ async def delete_many(self, lookup: Dict[str, Any]) -> List[str]: ids.append(str(doc.id)) await self.mongo_async.delete_one({"_id": doc.id}) + if self.config.versioning: + await self.mongo_versioned_async.delete_many({VERSION_ID_FIELD: doc.id}) + try: await self.elastic.remove(doc.id) except KeyError: @@ -482,7 +549,9 @@ async def get_all_batch(self, size=500, max_iterations=10000, lookup=None) -> As logger.warning(f"Not enough iterations for resource {self.resource_name}") @overload - async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]: + async def find( + self, req: SearchRequest + ) -> ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]: ... @overload @@ -494,7 +563,7 @@ async def find( sort: SortParam | None = None, projection: ProjectedFieldArg | None = None, use_mongo: bool = False, - ) -> ResourceCursorAsync[ResourceModelType]: + ) -> ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]: ... async def find( @@ -505,7 +574,8 @@ async def find( sort: SortParam | None = None, projection: ProjectedFieldArg | None = None, use_mongo: bool = False, - ) -> ResourceCursorAsync[ResourceModelType]: + # ) -> ResourceCursorAsync[ResourceModelType]: + ) -> ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]: """Find items from the resource using Elasticsearch :param req: SearchRequest instance, or a lookup dictionary, for the search params to be used @@ -522,7 +592,7 @@ async def find( req if isinstance(req, SearchRequest) else SearchRequest( - where=req if req else None, + where=req, page=page, max_results=max_results, sort=sort, @@ -784,13 +854,23 @@ async def get_cached(self) -> List[Dict[str, Any]]: key=lambda fn: f"_cache_mixin:{self.resource_name}", ) async def _get_cached_from_db(): + return await _get_from_db() + + async def _get_from_db(): cursor = await self.search(lookup=self.cache_lookup, use_mongo=True) return await cursor.to_list_raw() cached_data = self.get_cache() if cached_data is None: - cached_data = await _get_cached_from_db() + try: + cached_data = await _get_cached_from_db() + except RuntimeError: + # This is sometimes happening, due to lock trying to be released from another thread + # I think this is mostly happening in tests, but we require a fallback to make + # sure this will always work + logger.warning("RuntimeError raised when attempting to get items from cache", exc_info=True) + cached_data = await _get_from_db() self.set_cache(cached_data) return cached_data diff --git a/superdesk/core/resources/validators.py b/superdesk/core/resources/validators.py index 6cf737d986..e46c0f83fd 100644 --- a/superdesk/core/resources/validators.py +++ b/superdesk/core/resources/validators.py @@ -199,6 +199,7 @@ async def validate_unique_value_in_resource(item: ResourceModel, name: UniqueVal return AsyncValidator(validate_unique_value_in_resource) +# TODO-ASYNC: Allow ``resource_name`` to be optional, and we can obtain datasource/resource name from context def validate_iunique_value_async( resource_name: str, field_name: str, error_string: str | None = None ) -> AsyncValidator: diff --git a/superdesk/core/types/__init__.py b/superdesk/core/types/__init__.py new file mode 100644 index 0000000000..a0fa3ccc83 --- /dev/null +++ b/superdesk/core/types/__init__.py @@ -0,0 +1,62 @@ +from .common import DefaultNoValue +from .model import BaseModel +from .search import ( + ProjectedFieldArg, + SortListParam, + SortParam, + VersionParam, + SearchArgs, + SearchRequest, + ESQuery, + ESBoolQuery, +) +from .system import NotificationClientProtocol, WSGIApp +from .web import ( + HTTP_METHOD, + Response, + EndpointFunction, + RequestStorageProvider, + RequestSessionStorageProvider, + RequestStorage, + Request, + AuthRule, + AuthConfig, + Endpoint, + EndpointGroup, + RestResponseMeta, + RestGetResponse, +) + + +__all__ = [ + # common + "DefaultNoValue", + # web + "HTTP_METHOD", + "Response", + "EndpointFunction", + "RequestStorageProvider", + "RequestSessionStorageProvider", + "RequestStorage", + "Request", + "AuthRule", + "AuthConfig", + "Endpoint", + "EndpointGroup", + "RestResponseMeta", + "RestGetResponse", + # model + "BaseModel", + # search + "ProjectedFieldArg", + "SortListParam", + "SortParam", + "VersionParam", + "SearchArgs", + "SearchRequest", + "ESQuery", + "ESBoolQuery", + # system + "NotificationClientProtocol", + "WSGIApp", +] diff --git a/superdesk/core/types/common.py b/superdesk/core/types/common.py new file mode 100644 index 0000000000..b74e8f6989 --- /dev/null +++ b/superdesk/core/types/common.py @@ -0,0 +1 @@ +DefaultNoValue = object() diff --git a/superdesk/core/types/model.py b/superdesk/core/types/model.py new file mode 100644 index 0000000000..31321e58c1 --- /dev/null +++ b/superdesk/core/types/model.py @@ -0,0 +1,120 @@ +from typing import Any +from typing_extensions import Self, override + +from pydantic import BaseModel as PydanticModel, AliasChoices + +from .web import Request + + +class BaseModel(PydanticModel): + @classmethod + def get_field_names(cls) -> list[str]: + names: list[str] = [] + for field, info in cls.model_fields.items(): + if isinstance(info.validation_alias, AliasChoices): + # Exclude `AliasPath` instances from choices, as we won't be able to + # translate that into a field name + names.extend([choice for choice in info.validation_alias.choices if isinstance(choice, str)]) + else: + names.append(info.alias or field) + return names + + @classmethod + def from_url_args(cls, request: "Request", default_values: dict[str, Any] | None = None): + if default_values is None: + default_values = {} + + value_dict: dict[str, Any] = {} + for field in cls.get_field_names(): + value = request.get_url_arg(field) or default_values.get(field) + if value: + value_dict[field] = value + return cls.from_dict(value_dict) + + @override + @classmethod + def model_validate( + cls, + obj: dict[str, Any], + *, + strict: bool | None = None, + from_attributes: bool | None = None, + context: dict[str, Any] | None = None, + include_unknown: bool = False, + ) -> Self: + """Construct a model instance from the provided dictionary, and validate its values + + :param obj: Dictionary of values used to construct the model instance + :param strict: Whether to enforce types strictly + :param from_attributes: Whether to extract data from object attributes + :param context: Additional context to pass to the validator + :param include_unknown: Whether to include fields not defined in the ResourceModel + :raises Pydantic.ValidationError: If validation fails + :rtype: ResourceModel + :returns: The validated model instance + """ + + if not include_unknown: + data = {field: value for field, value in obj.items() if field in cls.get_field_names()} + else: + data = obj.copy() + data.pop("_type", None) + + instance = super().model_validate( + data, + strict=strict, + from_attributes=from_attributes, + context=context, + ) + + return instance + + @classmethod + def from_dict( + cls, + values: dict[str, Any], + context: dict[str, Any] | None = None, + include_unknown: bool = False, + ) -> Self: + """Construct a model instance from the provided dictionary, and validate its values + + :param values: Dictionary of values used to construct the model instance + :param context: Additional context to pass to the validator + :param include_unknown: Whether to include fields not defined in the ResourceModel + :raises Pydantic.ValidationError: If validation fails + :rtype: ResourceModel + :returns: The validated model instance + """ + + return cls.model_validate(values, context=context, include_unknown=include_unknown) + + @classmethod + def from_json(cls, data: str | bytes | bytearray, **kwargs): + return cls.model_validate_json(data, **kwargs) + + def to_dict(self, **kwargs) -> dict[str, Any]: + """ + Convert the model instance to a dictionary representation with non-JSON-serializable Python objects. + + :param kwargs: Optional keyword arguments to override the default parameters of model_dump. + :rtype: dict[str, Any] + :returns: A dictionary representation of the model instance with field aliases. + Only fields that are set (non-default) will be included. + """ + default_params: dict[str, Any] = {"by_alias": True, "exclude_unset": True} + default_params.update(kwargs) + model_dict = self.model_dump(**default_params) + return model_dict + + def to_json(self, **kwargs) -> str: + """ + Convert the model instance to a JSON serializable dictionary. + + :param kwargs: Optional keyword arguments to override the default parameters of model_dump_json. + :rtype: str + :return: A JSON-compatible dictionary representation of the model instance with field aliases. + Only fields that are set (non-default) will be included. + """ + default_params: dict[str, Any] = {"by_alias": True, "exclude_unset": True} + default_params.update(kwargs) + return self.model_dump_json(**default_params) diff --git a/superdesk/core/types/search.py b/superdesk/core/types/search.py new file mode 100644 index 0000000000..151bc916f5 --- /dev/null +++ b/superdesk/core/types/search.py @@ -0,0 +1,167 @@ +from typing import Any, Literal +from typing_extensions import TypedDict +from enum import Enum, unique + +from pydantic import BaseModel, ConfigDict, NonNegativeInt, field_validator, Field +from pydantic.dataclasses import dataclass + + +#: The data type for projections, either a list of field names, or a dictionary containing +#: the field and enable/disable state +ProjectedFieldArg = ( + list[str] + | set[str] + | dict[str, Literal[0]] + | dict[str, Literal[1]] + | dict[str, Literal[True]] + | dict[str, Literal[False]] +) + +#: Type used to provide list of sort params to be used +SortListParam = list[tuple[str, Literal[1, -1]]] + +#: Type used for sort param in service requests +#: can be a string, which will convert to an :attr:`SortListParam` type +SortParam = str | SortListParam + +#: Type used for version param in service requests +#: Can be either ``"all"`` or an int ``>= 0`` +VersionParam = Literal["all"] | NonNegativeInt + + +class SearchArgs(TypedDict, total=False): + """Dictionary containing Elasticsearch search arguments + + This is for use with the `.find` methods in elastic clients + """ + + #: A JSON string containing an elasticsearch query + source: str | dict + + #: A query string + q: str + + #: Default field, for use with the query string + df: str + + #: Default operator, for use with the query string (defaults to "AND") + default_operator: str + + #: A JSON string containing bool query filters, to be applied to the elastic query + filter: str | dict + + #: A list of dictionaries containing bool query filters, to be applied to the elastic query + filters: list[dict[str, Any]] + + #: A JSON string containing the field projections to filter out the returned fields + projections: str + + version: VersionParam | None + + +@dataclass +class ESBoolQuery: + must: list[dict[str, Any]] = Field(default_factory=list) + must_not: list[dict[str, Any]] = Field(default_factory=list) + should: list[dict[str, Any]] = Field(default_factory=list) + filter: list[dict[str, Any]] = Field(default_factory=list) + minimum_should_match: int | None = None + highlight: dict[str, Any] = Field(default_factory=dict) + + def has_filters(self): + return ( + len(self.must) > 0 + or len(self.must_not) > 0 + or len(self.should) > 0 + or len(self.filter) > 0 + or len(self.highlight) > 0 + ) + + +@dataclass +class ESQuery: + query: ESBoolQuery = Field(default_factory=ESBoolQuery) + post_filter: ESBoolQuery = Field(default_factory=ESBoolQuery) + aggs: dict[str, Any] = Field(default_factory=dict) + + def generate_query_dict(self, query: dict[str, Any] | None = None) -> dict[str, Any]: + if query is None: + query = {} + + if self.query.has_filters(): + query.setdefault("query", {}).setdefault("bool", {}) + if self.query.must: + query["query"]["bool"].setdefault("must", []).extend(self.query.must) + if self.query.must_not: + query["query"]["bool"].setdefault("must_not", []).extend(self.query.must_not) + if self.query.should: + query["query"]["bool"].setdefault("should", []).extend(self.query.should) + minimum_should_match = self.query.minimum_should_match + query["query"]["bool"]["minimum_should_match"] = ( + minimum_should_match if minimum_should_match is not None else 1 + ) + if self.query.filter: + query["query"]["bool"].setdefault("filter", []).extend(self.query.filter) + + if self.query.highlight: + query["highlight"] = self.query.highlight + + if self.post_filter.has_filters(): + query.setdefault("post_filter", {}).setdefault("bool", {}) + if self.post_filter.must: + query["post_filter"]["bool"]["must"] = self.post_filter.must + if self.post_filter.must_not: + query["post_filter"]["bool"]["must_not"] = self.post_filter.must_not + if self.post_filter.must_not: + query["post_filter"]["bool"]["should"] = self.post_filter.should + minimum_should_match = self.post_filter.minimum_should_match + query["query"]["bool"]["minimum_should_match"] = ( + minimum_should_match if minimum_should_match is not None else 1 + ) + if self.post_filter.filter: + query["post_filter"]["bool"]["filter"] = self.post_filter.filter + + return query + + +class SearchRequest(BaseModel): + """Dataclass containing Elasticsearch request arguments""" + + model_config = ConfigDict(extra="allow") + + #: Argument for the search filters + args: SearchArgs | None = None + + #: Sorting to be used + sort: SortParam | None = None + + #: Maximum number of documents to be returned + # TODO-ASYNC: Support None for `max_results`, and let the underlying resource service handle that instead + max_results: int = 25 + + #: The page number to be returned + page: int = 1 + + #: A JSON string contianing an Elasticsearch where query + where: str | dict | None = None + + #: If `True`, will include aggregations with the result + aggregations: bool = False + + #: If `True`, will include highlights with the result + highlight: bool = False + + #: The field projections to be applied + projection: ProjectedFieldArg | None = None + + version: int | None = None + + use_mongo: bool | None = None + + elastic: ESQuery = Field(default_factory=ESQuery) + + @field_validator("projection", mode="before") + def parse_projection(cls, value: ProjectedFieldArg | str | None) -> ProjectedFieldArg | None: + from superdesk.core import json + + return json.loads(value) if isinstance(value, str) else value diff --git a/superdesk/core/types/system.py b/superdesk/core/types/system.py new file mode 100644 index 0000000000..c90b0abf14 --- /dev/null +++ b/superdesk/core/types/system.py @@ -0,0 +1,98 @@ +from typing import Any, Sequence, Protocol + +from .web import Request, Endpoint, EndpointGroup + + +class NotificationClientProtocol(Protocol): + open: bool + messages: Sequence[str] + + def close(self) -> None: + ... + + def send(self, message: str) -> None: + ... + + def reset(self) -> None: + ... + + +class WSGIApp(Protocol): + """Protocol for defining functionality from a WSGI application (such as Eve/Flask) + + A class instance that adheres to this protocol is passed into the SuperdeskAsyncApp constructor. + This way the SuperdeskAsyncApp does not need to know the underlying WSGI application, just that + it provides certain functionality. + """ + + #: Config for the application + config: dict[str, Any] + + #: Config for the front-end application + client_config: dict[str, Any] + + testing: bool = False + + #: Interface to upload/download/query media + media: Any + + mail: Any + + data: Any + + storage: Any + + auth: Any + + subjects: Any + + notification_client: NotificationClientProtocol + + locators: Any + + celery: Any + + redis: Any + + jinja_loader: Any + + jinja_env: Any + + extensions: dict[str, Any] + + def register_endpoint(self, endpoint: Endpoint | EndpointGroup): + ... + + def register_resource(self, name: str, settings: dict[str, Any]): + ... + + def upload_url(self, media_id: str) -> str: + ... + + def download_url(self, media_id: str) -> str: + ... + + # TODO: Provide proper type here, context manager + def app_context(self): + ... + + def get_current_user_dict(self) -> dict[str, Any] | None: + ... + + def response_class(self, *args, **kwargs) -> Any: + ... + + def validator(self, *args, **kwargs) -> Any: + ... + + def init_indexes(self, ignore_duplicate_keys: bool = False) -> None: + ... + + def as_any(self) -> Any: + ... + + def get_current_request(self) -> Request | None: + ... + + def get_endpoint_for_current_request(self) -> Endpoint | None: + ... diff --git a/superdesk/core/types.py b/superdesk/core/types/web.py similarity index 58% rename from superdesk/core/types.py rename to superdesk/core/types/web.py index 2a25881ba4..21b8e692b2 100644 --- a/superdesk/core/types.py +++ b/superdesk/core/types/web.py @@ -9,10 +9,7 @@ # at https://www.sourcefabric.org/superdesk/license from typing import ( - Dict, Any, - Optional, - List, Literal, Sequence, Union, @@ -24,104 +21,15 @@ NoReturn, ) from typing_extensions import TypedDict -from dataclasses import dataclass -from pydantic import BaseModel, ConfigDict, NonNegativeInt, field_validator - -DefaultNoValue = object() +from pydantic import BaseModel +from pydantic.dataclasses import dataclass +from .common import DefaultNoValue HTTP_METHOD = Literal["GET", "POST", "PATCH", "PUT", "DELETE", "HEAD", "OPTIONS"] -#: The data type for projections, either a list of field names, or a dictionary containing -#: the field and enable/disable state -ProjectedFieldArg = ( - list[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] -) - -#: Type used to provide list of sort params to be used -SortListParam = list[tuple[str, Literal[1, -1]]] - -#: Type used for sort param in service requests -#: can be a string, which will convert to an :attr:`SortListParam` type -SortParam = str | SortListParam - -#: Type used for version param in service requests -#: Can be either ``"all"`` or an int ``>= 0`` -VersionParam = Literal["all"] | NonNegativeInt - - -class SearchArgs(TypedDict, total=False): - """Dictionary containing Elasticsearch search arguments - - This is for use with the `.find` methods in elastic clients - """ - - #: A JSON string containing an elasticsearch query - source: str - - #: A query string - q: str - - #: Default field, for use with the query string - df: str - - #: Default operator, for use with the query string (defaults to "AND") - default_operator: str - - #: A JSON string containing bool query filters, to be applied to the elastic query - filter: str - - #: A list of dictionaries containing bool query filters, to be applied to the elastic query - filters: List[Dict[str, Any]] - - #: A JSON string containing the field projections to filter out the returned fields - projections: str - - version: VersionParam | None - - -class SearchRequest(BaseModel): - """Dataclass containing Elasticsearch request arguments""" - - model_config = ConfigDict(extra="allow") - - #: Argument for the search filters - args: Optional[SearchArgs] = None - - #: Sorting to be used - sort: SortParam | None = None - - #: Maximum number of documents to be returned - max_results: int = 25 - - #: The page number to be returned - page: int = 1 - - #: A JSON string contianing an Elasticsearch where query - where: str | dict | None = None - - #: If `True`, will include aggregations with the result - aggregations: bool = False - - #: If `True`, will include highlights with the result - highlight: bool = False - - #: The field projections to be applied - projection: Optional[ProjectedFieldArg] = None - - @field_validator("projection", mode="before") - def parse_projection(cls, value: ProjectedFieldArg | str | None) -> ProjectedFieldArg | None: - from superdesk.core import json - - if not value: - return None - elif isinstance(value, str): - return json.loads(value) - return value - - @dataclass class Response: """Dataclass for endpoints to return response from a request""" @@ -274,11 +182,11 @@ def path(self) -> str: """Returns the URL of the current request""" ... - def get_header(self, key: str) -> Optional[str]: + def get_header(self, key: str) -> str | None: """Get an HTTP header from the current request""" ... - async def get_json(self) -> Union[Any, None]: + async def get_json(self) -> Any | None: """Get the body of the current request in JSON format""" ... @@ -286,7 +194,7 @@ async def get_form(self) -> Mapping: """Get the body of the current request in form format""" ... - async def get_data(self) -> Union[bytes, str]: + async def get_data(self) -> bytes | str: """Get the body of the current request in raw bytes format""" ... @@ -302,6 +210,9 @@ def get_url_arg(self, key: str) -> str | None: def redirect(self, location: str, code: int = 302) -> Any: ... + def is_json_request(self) -> bool: + ... + AuthRule = Callable[[Request], Awaitable[Any | None]] AuthConfig = Literal[False] | list[AuthRule] | dict[str, AuthRule] | None @@ -317,7 +228,7 @@ class Endpoint: name: str #: HTTP Methods allowed for this endpoint - methods: List[HTTP_METHOD] + methods: list[HTTP_METHOD] #: The callback function used to process the request func: EndpointFunction @@ -360,10 +271,10 @@ class EndpointGroup(Endpoint): import_name: str #: Optional url prefix to be added to all routes of this group - url_prefix: Optional[str] + url_prefix: str | None #: List of endpoints registered with this group - endpoints: List[Endpoint] + endpoints: list[Endpoint] def endpoint( self, @@ -375,101 +286,6 @@ def endpoint( ... -class NotificationClientProtocol(Protocol): - open: bool - messages: Sequence[str] - - def close(self) -> None: - ... - - def send(self, message: str) -> None: - ... - - def reset(self) -> None: - ... - - -class WSGIApp(Protocol): - """Protocol for defining functionality from a WSGI application (such as Eve/Flask) - - A class instance that adheres to this protocol is passed into the SuperdeskAsyncApp constructor. - This way the SuperdeskAsyncApp does not need to know the underlying WSGI application, just that - it provides certain functionality. - """ - - #: Config for the application - config: dict[str, Any] - - #: Config for the front-end application - client_config: dict[str, Any] - - testing: bool = False - - #: Interface to upload/download/query media - media: Any - - mail: Any - - data: Any - - storage: Any - - auth: Any - - subjects: Any - - notification_client: NotificationClientProtocol - - locators: Any - - celery: Any - - redis: Any - - jinja_loader: Any - - jinja_env: Any - - extensions: Dict[str, Any] - - def register_endpoint(self, endpoint: Endpoint | EndpointGroup): - ... - - def register_resource(self, name: str, settings: dict[str, Any]): - ... - - def upload_url(self, media_id: str) -> str: - ... - - def download_url(self, media_id: str) -> str: - ... - - # TODO: Provide proper type here, context manager - def app_context(self): - ... - - def get_current_user_dict(self) -> dict[str, Any] | None: - ... - - def response_class(self, *args, **kwargs) -> Any: - ... - - def validator(self, *args, **kwargs) -> Any: - ... - - def init_indexes(self, ignore_duplicate_keys: bool = False) -> None: - ... - - def as_any(self) -> Any: - ... - - def get_current_request(self) -> Request | None: - ... - - def get_endpoint_for_current_request(self) -> Endpoint | None: - ... - - class RestResponseMeta(TypedDict): """Dictionary to hold the response metadata for a REST request""" diff --git a/superdesk/factory/app.py b/superdesk/factory/app.py index ddef7a00c9..9f3de92b84 100644 --- a/superdesk/factory/app.py +++ b/superdesk/factory/app.py @@ -170,6 +170,9 @@ def get_url_arg(self, key: str) -> str | None: def redirect(self, location: str, code: int = 302) -> Any: return redirect(location, code) + def is_json_request(self) -> bool: + return self.request.accept_mimetypes.best_match(["application/json", "text/html"]) == "application/json" + def set_error_handlers(app): """Set error handlers for the given application object. diff --git a/superdesk/resource_fields.py b/superdesk/resource_fields.py index b89b1d9553..68178d69c7 100644 --- a/superdesk/resource_fields.py +++ b/superdesk/resource_fields.py @@ -9,6 +9,7 @@ # at https://www.sourcefabric.org/superdesk/license ID_FIELD = "_id" +GUID_FIELD = "guid" VERSION_ID_FIELD = "_id_document" ITEM_TYPE = "type" ITEM_STATE = "state" diff --git a/tests/core/resource_model_test.py b/tests/core/resource_model_test.py index d9fa980ac5..bf8f4c0c5e 100644 --- a/tests/core/resource_model_test.py +++ b/tests/core/resource_model_test.py @@ -72,6 +72,7 @@ class ModelWithObjectId(ResourceModelWithObjectId): self.assertIsInstance(ModelWithObjectId(id=ObjectId(), name="foo").id, ObjectId) def test_elastic_mapping(self): + self.maxDiff = None # Test the generated mapping self.assertEqual( get_elastic_mapping_from_model("users_async", User), @@ -80,10 +81,26 @@ def test_elastic_mapping(self): "_created": {"type": "date"}, "_updated": {"type": "date"}, "_etag": {"type": "text"}, - "first_name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "last_name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "email": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "first_name": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + "analyzer": "html_field_analyzer", + }, + "last_name": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + "analyzer": "html_field_analyzer", + }, + "email": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + "analyzer": "html_field_analyzer", + }, + "name": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + "analyzer": "html_field_analyzer", + }, "username": {"type": "text"}, "code": {"type": "keyword"}, "bio": {"type": "text", "analyzer": "html_field_analyzer"}, diff --git a/tests/core/resource_service_test.py b/tests/core/resource_service_test.py index 609896e326..0bd1396e87 100644 --- a/tests/core/resource_service_test.py +++ b/tests/core/resource_service_test.py @@ -399,11 +399,13 @@ async def assert_es_find_called_with(*args, **kwargs): ) expected = SearchRequest() await assert_es_find_called_with(SearchRequest(), expected=expected) + expected.where = {} await assert_es_find_called_with({}, expected=expected) sort_query = [("last_name.keyword", 1), ("first_name.keyword", 1)] expected = SearchRequest(sort=sort_query) await assert_es_find_called_with(SearchRequest(sort=sort_query), expected=expected) + expected.where = {} await assert_es_find_called_with({}, sort=sort_query, expected=expected) kwargs = dict( @@ -413,6 +415,7 @@ async def assert_es_find_called_with(*args, **kwargs): ) expected = SearchRequest(**kwargs) await assert_es_find_called_with(SearchRequest(**kwargs), expected=expected) + expected.where = {} await assert_es_find_called_with({}, **kwargs, expected=expected) # Test with default sort in the resource config @@ -420,10 +423,12 @@ async def assert_es_find_called_with(*args, **kwargs): self.service.config.default_sort = sort_query expected = SearchRequest(sort=sort_query) await assert_es_find_called_with(SearchRequest(), expected=expected) + expected.where = {} await assert_es_find_called_with({}, expected=expected) # Test passing in sort param with default sort configured custom_sort_query = [("scores", 1)] expected = SearchRequest(sort=custom_sort_query) await assert_es_find_called_with(SearchRequest(sort=custom_sort_query), expected=expected) + expected.where = {} await assert_es_find_called_with({}, sort=custom_sort_query, expected=expected)