diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c7ebcf1..4f7d58f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,24 +30,14 @@ jobs: - name: Run lint check run: poetry run pre-commit run -a ${{ matrix.cmd }} pytest: - services: - redis: - image: bitnami/redis:6.2.5 - env: - ALLOW_EMPTY_PASSWORD: "yes" - options: >- - --health-cmd="redis-cli ping" - --health-interval=5s - --health-timeout=5s - --health-retries=30 - ports: - - 6379:6379 strategy: matrix: py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] runs-on: "ubuntu-latest" steps: - uses: actions/checkout@v4 + - name: Set up Redis instance and Redis cluster + run: docker-compose up -d - name: Set up Python uses: actions/setup-python@v2 with: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f7810f2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,60 @@ +version: '3.2' + +services: + redis: + image: bitnami/redis:6.2.5 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 3 + start_period: 10s + ports: + - 7000:6379 + redis-node-0: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-1: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-2: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-3: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-4: + image: docker.io/bitnami/redis-cluster:7.2 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + redis-node-5: + image: docker.io/bitnami/redis-cluster:7.2 + depends_on: + - redis-node-0 + - redis-node-1 + - redis-node-2 + - redis-node-3 + - redis-node-4 + environment: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + REDIS_CLUSTER_REPLICAS: 1 + REDIS_CLUSTER_CREATOR: "yes" + ports: + - 7001:6379 diff --git a/pyproject.toml b/pyproject.toml index 5ce60f5..f34bed5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq-redis" -version = "0.5.1" +version = "0.5.2" description = "Redis integration for taskiq" authors = ["taskiq-team "] readme = "README.md" diff --git a/taskiq_redis/__init__.py b/taskiq_redis/__init__.py index cf6a432..36cec84 100644 --- a/taskiq_redis/__init__.py +++ b/taskiq_redis/__init__.py @@ -1,9 +1,13 @@ """Package for redis integration.""" -from taskiq_redis.redis_backend import RedisAsyncResultBackend +from taskiq_redis.redis_backend import ( + RedisAsyncClusterResultBackend, + RedisAsyncResultBackend, +) from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker from taskiq_redis.schedule_source import RedisScheduleSource __all__ = [ + "RedisAsyncClusterResultBackend", "RedisAsyncResultBackend", "ListQueueBroker", "PubSubBroker", diff --git a/taskiq_redis/redis_backend.py b/taskiq_redis/redis_backend.py index 35103a4..3a0810d 100644 --- a/taskiq_redis/redis_backend.py +++ b/taskiq_redis/redis_backend.py @@ -2,6 +2,7 @@ from typing import Dict, Optional, TypeVar, Union from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.cluster import RedisCluster from taskiq import AsyncResultBackend from taskiq.abc.result_backend import TaskiqResult @@ -134,3 +135,122 @@ async def get_result( taskiq_result.log = None return taskiq_result + + +class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]): + """Async result backend based on redis cluster.""" + + def __init__( + self, + redis_url: str, + keep_results: bool = True, + result_ex_time: Optional[int] = None, + result_px_time: Optional[int] = None, + ) -> None: + """ + Constructs a new result backend. + + :param redis_url: url to redis cluster. + :param keep_results: flag to not remove results from Redis after reading. + :param result_ex_time: expire time in seconds for result. + :param result_px_time: expire time in milliseconds for result. + + :raises DuplicateExpireTimeSelectedError: if result_ex_time + and result_px_time are selected. + :raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time + and result_px_time are equal zero. + """ + self.redis: RedisCluster[bytes] = RedisCluster.from_url(redis_url) + self.keep_results = keep_results + self.result_ex_time = result_ex_time + self.result_px_time = result_px_time + + unavailable_conditions = any( + ( + self.result_ex_time is not None and self.result_ex_time <= 0, + self.result_px_time is not None and self.result_px_time <= 0, + ), + ) + if unavailable_conditions: + raise ExpireTimeMustBeMoreThanZeroError( + "You must select one expire time param and it must be more than zero.", + ) + + if self.result_ex_time and self.result_px_time: + raise DuplicateExpireTimeSelectedError( + "Choose either result_ex_time or result_px_time.", + ) + + async def shutdown(self) -> None: + """Closes redis connection.""" + await self.redis.aclose() # type: ignore[attr-defined] + await super().shutdown() + + async def set_result( + self, + task_id: str, + result: TaskiqResult[_ReturnType], + ) -> None: + """ + Sets task result in redis. + + Dumps TaskiqResult instance into the bytes and writes + it to redis. + + :param task_id: ID of the task. + :param result: TaskiqResult instance. + """ + redis_set_params: Dict[str, Union[str, bytes, int]] = { + "name": task_id, + "value": pickle.dumps(result), + } + if self.result_ex_time: + redis_set_params["ex"] = self.result_ex_time + elif self.result_px_time: + redis_set_params["px"] = self.result_px_time + + await self.redis.set(**redis_set_params) # type: ignore + + async def is_result_ready(self, task_id: str) -> bool: + """ + Returns whether the result is ready. + + :param task_id: ID of the task. + + :returns: True if the result is ready else False. + """ + return bool(await self.redis.exists(task_id)) # type: ignore[attr-defined] + + async def get_result( + self, + task_id: str, + with_logs: bool = False, + ) -> TaskiqResult[_ReturnType]: + """ + Gets result from the task. + + :param task_id: task's id. + :param with_logs: if True it will download task's logs. + :raises ResultIsMissingError: if there is no result when trying to get it. + :return: task's return value. + """ + if self.keep_results: + result_value = await self.redis.get( # type: ignore[attr-defined] + name=task_id, + ) + else: + result_value = await self.redis.getdel( # type: ignore[attr-defined] + name=task_id, + ) + + if result_value is None: + raise ResultIsMissingError + + taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301 + result_value, + ) + + if not with_logs: + taskiq_result.log = None + + return taskiq_result diff --git a/tests/conftest.py b/tests/conftest.py index 1abfa07..dcccb79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,4 +25,18 @@ def redis_url() -> str: :return: URL string. """ - return os.environ.get("TEST_REDIS_URL", "redis://localhost") + return os.environ.get("TEST_REDIS_URL", "redis://localhost:7000") + + +@pytest.fixture +def redis_cluster_url() -> str: + """ + URL to connect to redis cluster. + + It tries to get it from environ, + and return default one if the variable is + not set. + + :return: URL string. + """ + return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001") diff --git a/tests/test_result_backend.py b/tests/test_result_backend.py index 183696f..15ecdd0 100644 --- a/tests/test_result_backend.py +++ b/tests/test_result_backend.py @@ -3,7 +3,7 @@ import pytest from taskiq import TaskiqResult -from taskiq_redis import RedisAsyncResultBackend +from taskiq_redis import RedisAsyncClusterResultBackend, RedisAsyncResultBackend from taskiq_redis.exceptions import ResultIsMissingError @@ -130,3 +130,128 @@ async def test_keep_results_after_reading(redis_url: str) -> None: res2 = await result_backend.get_result(task_id=task_id) assert res1 == res2 await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_set_result_success_cluster(redis_cluster_url: str) -> None: + """ + Tests that results can be set without errors in cluster mode. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + fetched_result = await result_backend.get_result( + task_id=task_id, + with_logs=True, + ) + assert fetched_result.log == "My Log" + assert fetched_result.return_value == 11 + assert fetched_result.execution_time == 112.2 + assert fetched_result.is_err + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_fetch_without_logs_cluster(redis_cluster_url: str) -> None: + """ + Check if fetching value without logs works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + fetched_result = await result_backend.get_result( + task_id=task_id, + with_logs=False, + ) + assert fetched_result.log is None + assert fetched_result.return_value == 11 + assert fetched_result.execution_time == 112.2 + assert fetched_result.is_err + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_remove_results_after_reading_cluster(redis_cluster_url: str) -> None: + """ + Check if removing results after reading works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + keep_results=False, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + await result_backend.get_result(task_id=task_id) + with pytest.raises(ResultIsMissingError): + await result_backend.get_result(task_id=task_id) + + await result_backend.shutdown() + + +@pytest.mark.anyio +async def test_keep_results_after_reading_cluster(redis_cluster_url: str) -> None: + """ + Check if keeping results after reading works fine. + + :param redis_url: redis URL. + """ + result_backend = RedisAsyncClusterResultBackend( # type: ignore + redis_url=redis_cluster_url, + keep_results=True, + ) + task_id = uuid.uuid4().hex + result: "TaskiqResult[int]" = TaskiqResult( + is_err=True, + log="My Log", + return_value=11, + execution_time=112.2, + ) + await result_backend.set_result( + task_id=task_id, + result=result, + ) + + res1 = await result_backend.get_result(task_id=task_id) + res2 = await result_backend.get_result(task_id=task_id) + assert res1 == res2 + await result_backend.shutdown()