Skip to content

Commit

Permalink
🏗️(backends) move write/list methods out of base data backend
Browse files Browse the repository at this point in the history
We simplify the base data backend interface by moving out
the write and list methods.
Now, data backends that implement the list and write methods should
inherit from Writable and Listable classes.
  • Loading branch information
SergioSim committed Oct 30, 2023
1 parent 7aa8f45 commit b24d2c5
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 148 deletions.
5 changes: 4 additions & 1 deletion src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from elasticsearch.helpers import BulkIndexError, async_streaming_bulk

from ralph.backends.data.base import (
AsyncListable,
AsyncWritable,
BaseAsyncDataBackend,
BaseOperationType,
DataBackendStatus,
Expand All @@ -23,12 +25,13 @@
logger = logging.getLogger(__name__)


class AsyncESDataBackend(BaseAsyncDataBackend):
class AsyncESDataBackend(BaseAsyncDataBackend, AsyncWritable, AsyncListable):
"""Asynchronous Elasticsearch data backend."""

name = "async_es"
query_model = ESQuery
settings_class = ESDataBackendSettings
default_operation_type = BaseOperationType.INDEX

def __init__(self, settings: Optional[ESDataBackendSettings] = None):
"""Instantiate the asynchronous Elasticsearch client.
Expand Down
1 change: 1 addition & 0 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class AsyncMongoDataBackend(BaseAsyncDataBackend):
name = "async_mongo"
query_model = MongoQuery
settings_class = MongoDataBackendSettings
default_operation_type = BaseOperationType.INDEX

def __init__(self, settings: Optional[MongoDataBackendSettings] = None):
"""Instantiate the asynchronous MongoDB client.
Expand Down
208 changes: 113 additions & 95 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,77 @@ def wrapper(*args, **kwargs):
return wrapper


class Writable(ABC):
"""Data backend interface for backends supporting the write operation."""

default_operation_type = BaseOperationType.INDEX

@abstractmethod
def write( # pylint: disable=too-many-arguments
self,
data: Union[IOBase, Iterable[bytes], Iterable[dict]],
target: Optional[str] = None,
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
) -> int:
"""Write `data` records to the `target` container and return their count.
Args:
data: (Iterable or IOBase): The data to write.
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
chunk_size (int or None): The number of records or bytes to write in one
batch, depending on whether `data` contains dictionaries or bytes.
If `chunk_size` is `None`, a default value is used instead.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Return:
int: The number of written records.
Raise:
BackendException: If a failure during the write operation occurs and
`ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""


class Listable(ABC):
"""Data backend interface for backends supporting the list operation."""

@abstractmethod
def list(
self, target: Optional[str] = None, details: bool = False, new: bool = False
) -> Iterator[Union[str, dict]]:
"""List containers in the data backend. E.g., collections, files, indexes.
Args:
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
details (bool): Get detailed container information instead of just names.
new (bool): Given the history, list only not already read containers.
Yield:
str: If `details` is False.
dict: If `details` is True.
Raise:
BackendException: If a failure occurs.
BackendParameterException: If a backend argument value is not valid.
"""


class BaseDataBackend(ABC):
"""Base data backend interface."""

type = "data"
name = "base"
query_model = BaseQuery
default_operation_type = BaseOperationType.INDEX
settings_class = BaseDataBackendSettings

@abstractmethod
Expand Down Expand Up @@ -135,27 +199,6 @@ def status(self) -> DataBackendStatus:
DataBackendStatus: The status of the data backend.
"""

@abstractmethod
def list(
self, target: Optional[str] = None, details: bool = False, new: bool = False
) -> Iterator[Union[str, dict]]:
"""List containers in the data backend. E.g., collections, files, indexes.
Args:
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
details (bool): Get detailed container information instead of just names.
new (bool): Given the history, list only not already read containers.
Yield:
str: If `details` is False.
dict: If `details` is True.
Raise:
BackendException: If a failure occurs.
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
@enforce_query_checks
def read(
Expand Down Expand Up @@ -196,7 +239,35 @@ def read(
"""

@abstractmethod
def write( # pylint: disable=too-many-arguments
def close(self) -> None:
"""Close the data backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""


def async_enforce_query_checks(method):
"""Enforce query argument type checking for methods using it."""

@functools.wraps(method)
async def wrapper(*args, **kwargs):
"""Wrap method execution."""
query = kwargs.pop("query", None)
self_ = args[0]
async for result in method(*args, query=self_.validate_query(query), **kwargs):
yield result

return wrapper


class AsyncWritable(ABC):
"""Async data backend interface for backends supporting the write operation."""

default_operation_type = BaseOperationType.INDEX

@abstractmethod
async def write( # pylint: disable=too-many-arguments
self,
data: Union[IOBase, Iterable[bytes], Iterable[dict]],
target: Optional[str] = None,
Expand Down Expand Up @@ -229,27 +300,30 @@ def write( # pylint: disable=too-many-arguments
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
def close(self) -> None:
"""Close the data backend client.

Raise:
BackendException: If a failure occurs during the close operation.
"""
class AsyncListable(ABC):
"""Async data backend interface for backends supporting the list operation."""

@abstractmethod
async def list(
self, target: Optional[str] = None, details: bool = False, new: bool = False
) -> Iterator[Union[str, dict]]:
"""List containers in the data backend. E.g., collections, files, indexes.
def async_enforce_query_checks(method):
"""Enforce query argument type checking for methods using it."""
Args:
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
details (bool): Get detailed container information instead of just names.
new (bool): Given the history, list only not already read containers.
@functools.wraps(method)
async def wrapper(*args, **kwargs):
"""Wrap method execution."""
query = kwargs.pop("query", None)
self_ = args[0]
async for result in method(*args, query=self_.validate_query(query), **kwargs):
yield result
Yield:
str: If `details` is False.
dict: If `details` is True.
return wrapper
Raise:
BackendException: If a failure occurs.
BackendParameterException: If a backend argument value is not valid.
"""


class BaseAsyncDataBackend(ABC):
Expand All @@ -258,7 +332,6 @@ class BaseAsyncDataBackend(ABC):
type = "data"
name = "base"
query_model = BaseQuery
default_operation_type = BaseOperationType.INDEX
settings_class = BaseDataBackendSettings

@abstractmethod
Expand Down Expand Up @@ -308,27 +381,6 @@ async def status(self) -> DataBackendStatus:
DataBackendStatus: The status of the data backend.
"""

@abstractmethod
async def list(
self, target: Optional[str] = None, details: bool = False, new: bool = False
) -> Iterator[Union[str, dict]]:
"""List containers in the data backend. E.g., collections, files, indexes.
Args:
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
details (bool): Get detailed container information instead of just names.
new (bool): Given the history, list only not already read containers.
Yield:
str: If `details` is False.
dict: If `details` is True.
Raise:
BackendException: If a failure occurs.
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
@async_enforce_query_checks
async def read(
Expand Down Expand Up @@ -368,40 +420,6 @@ async def read(
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
async def write( # pylint: disable=too-many-arguments
self,
data: Union[IOBase, Iterable[bytes], Iterable[dict]],
target: Optional[str] = None,
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
) -> int:
"""Write `data` records to the `target` container and return their count.
Args:
data: (Iterable or IOBase): The data to write.
target (str or None): The target container name.
If `target` is `None`, a default value is used instead.
chunk_size (int or None): The number of records or bytes to write in one
batch, depending on whether `data` contains dictionaries or bytes.
If `chunk_size` is `None`, a default value is used instead.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Return:
int: The number of written records.
Raise:
BackendException: If a failure during the write operation occurs and
`ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
async def close(self) -> None:
"""Close the data backend client.
Expand Down
4 changes: 3 additions & 1 deletion src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
BaseOperationType,
BaseQuery,
DataBackendStatus,
Listable,
Writable,
enforce_query_checks,
)
from ralph.conf import BaseSettingsConfig, ClientOptions
Expand Down Expand Up @@ -109,7 +111,7 @@ class ClickHouseQuery(BaseClickHouseQuery):
query_string: Union[Json[BaseClickHouseQuery], None]


class ClickHouseDataBackend(BaseDataBackend):
class ClickHouseDataBackend(BaseDataBackend, Writable, Listable):
"""ClickHouse database backend."""

name = "clickhouse"
Expand Down
4 changes: 3 additions & 1 deletion src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
BaseOperationType,
BaseQuery,
DataBackendStatus,
Listable,
Writable,
enforce_query_checks,
)
from ralph.conf import BaseSettingsConfig, ClientOptions, CommaSeparatedTuple
Expand Down Expand Up @@ -109,7 +111,7 @@ class ESQuery(BaseQuery):
track_total_hits: Literal[False] = False


class ESDataBackend(BaseDataBackend):
class ESDataBackend(BaseDataBackend, Writable, Listable):
"""Elasticsearch data backend."""

name = "es"
Expand Down
4 changes: 3 additions & 1 deletion src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
BaseOperationType,
BaseQuery,
DataBackendStatus,
Listable,
Writable,
enforce_query_checks,
)
from ralph.backends.mixins import HistoryMixin
Expand Down Expand Up @@ -49,7 +51,7 @@ class Config(BaseSettingsConfig):
LOCALE_ENCODING: str = "utf8"


class FSDataBackend(HistoryMixin, BaseDataBackend):
class FSDataBackend(HistoryMixin, BaseDataBackend, Writable, Listable):
"""FileSystem data backend."""

name = "fs"
Expand Down
Loading

0 comments on commit b24d2c5

Please sign in to comment.