From 5ceedcaf1eb259309916b7e1f823a360c52b8026 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Fri, 10 Jan 2025 09:31:06 +0100 Subject: [PATCH] improved arguments management during declare_queue --- examples/getting_started/main.py | 1 - rabbitmq_amqp_python_client/management.py | 19 +++---- rabbitmq_amqp_python_client/queues.py | 6 +-- tests/test_management.py | 61 +++++++++++++++++------ tests/test_publisher.py | 7 +-- 5 files changed, 60 insertions(+), 34 deletions(-) diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index a9ef64f..2bab238 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -4,7 +4,6 @@ ExchangeSpecification, Message, QueueType, - QuorumQueueSpecification, StreamSpecification, exchange_address, ) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index e7bd15b..be1d65b 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import Any, Optional +from typing import Any, Optional, Union from .address_helper import ( binding_path_with_exchange_queue, @@ -130,19 +130,20 @@ def declare_exchange( def declare_queue( self, - queue_specification: ( - ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification - ), - ) -> ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification: + queue_specification: Union[ + ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification + ], + ) -> Union[ + ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification + ]: logger.debug("declare_queue operation called") - if ( - type(queue_specification) is ClassicQueueSpecification - or type(queue_specification) is QuorumQueueSpecification + if isinstance(queue_specification, ClassicQueueSpecification) or isinstance( + queue_specification, QuorumQueueSpecification ): body = self._declare_queue(queue_specification) - elif type(queue_specification) is StreamSpecification: + elif isinstance(queue_specification, StreamSpecification): body = self._declare_stream(queue_specification) path = queue_address(queue_specification.name) diff --git a/rabbitmq_amqp_python_client/queues.py b/rabbitmq_amqp_python_client/queues.py index 2e030a9..38feb91 100644 --- a/rabbitmq_amqp_python_client/queues.py +++ b/rabbitmq_amqp_python_client/queues.py @@ -39,9 +39,9 @@ class QuorumQueueSpecification(QueueSpecification): class StreamSpecification: name: str queue_type: QueueType = QueueType.stream - max_len_bytes: Optional[str] = None - max_time_retention: Optional[str] = None - max_segment_size_in_bytes: Optional[str] = None + max_len_bytes: Optional[int] = None + max_time_retention: Optional[int] = None + max_segment_size_in_bytes: Optional[int] = None filter_size: Optional[int] = None initial_group_size: Optional[int] = None leader_locator: Optional[str] = None diff --git a/tests/test_management.py b/tests/test_management.py index 1534807..3d838f7 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -1,9 +1,11 @@ from rabbitmq_amqp_python_client import ( BindingSpecification, + ClassicQueueSpecification, Connection, ExchangeSpecification, - QueueSpecification, QueueType, + QuorumQueueSpecification, + StreamSpecification, ) from rabbitmq_amqp_python_client.exceptions import ( ValidationCodeException, @@ -35,9 +37,7 @@ def test_declare_purge_delete_queue() -> None: queue_name = "my_queue" management = connection.management() - queue_info = management.declare_queue( - QueueSpecification(name=queue_name, queue_type=QueueType.quorum) - ) + queue_info = management.declare_queue(QuorumQueueSpecification(name=queue_name)) assert queue_info.name == queue_name @@ -59,9 +59,7 @@ def test_bind_exchange_to_queue() -> None: management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) - management.declare_queue( - QueueSpecification(name=queue_name, queue_type=QueueType.quorum) - ) + management.declare_queue(QuorumQueueSpecification(name=queue_name)) binding_exchange_queue_path = management.bind( BindingSpecification( @@ -98,9 +96,8 @@ def test_queue_info_with_validations() -> None: queue_name = "test_queue_info_with_validation" management = connection.management() - queue_specification = QueueSpecification( + queue_specification = QuorumQueueSpecification( name=queue_name, - queue_type=QueueType.quorum, ) management.declare_queue(queue_specification) @@ -122,16 +119,15 @@ def test_queue_precondition_fail() -> None: queue_name = "test-queue_precondition_fail" management = connection.management() - queue_specification = QueueSpecification( - name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False + queue_specification = QuorumQueueSpecification( + name=queue_name, is_auto_delete=False ) management.declare_queue(queue_specification) management.declare_queue(queue_specification) - queue_specification = QueueSpecification( + queue_specification = QuorumQueueSpecification( name=queue_name, - queue_type=QueueType.quorum, is_auto_delete=True, ) @@ -152,7 +148,7 @@ def test_declare_classic_queue() -> None: queue_name = "test-declare_classic_queue" management = connection.management() - queue_specification = QueueSpecification( + queue_specification = QuorumQueueSpecification( name=queue_name, queue_type=QueueType.classic, is_auto_delete=False, @@ -165,14 +161,14 @@ def test_declare_classic_queue() -> None: management.delete_queue(queue_name) -def test_declare_queue_with_args() -> None: +def test_declare_classic_queue_with_args() -> None: connection = Connection("amqp://guest:guest@localhost:5672/") connection.dial() queue_name = "test-queue_with_args" management = connection.management() - queue_specification = QueueSpecification( + queue_specification = ClassicQueueSpecification( name=queue_name, queue_type=QueueType.classic, is_auto_delete=False, @@ -201,3 +197,36 @@ def test_declare_queue_with_args() -> None: ) management.delete_queue(queue_name) + + +def test_declare_stream_with_args() -> None: + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + stream_name = "test-stream_with_args" + management = connection.management() + + stream_specification = StreamSpecification( + name=stream_name, + max_len_bytes=1000000000, + max_time_retention=10000000, + max_segment_size_in_bytes=100000000, + filter_size=1000, + initial_group_size=3, + leader_locator="node1", + ) + + stream_info = management.declare_queue(stream_specification) + + assert stream_specification.name == stream_info.name + assert stream_specification.max_len_bytes == stream_info.max_len_bytes + assert stream_specification.max_time_retention == stream_info.max_time_retention + assert ( + stream_specification.max_segment_size_in_bytes + == stream_info.max_segment_size_in_bytes + ) + assert stream_specification.filter_size == stream_info.filter_size + assert stream_specification.initial_group_size == stream_info.initial_group_size + assert stream_specification.leader_locator == stream_info.leader_locator + + management.delete_queue(stream_name) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index fb9120c..383f6db 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -1,8 +1,7 @@ from rabbitmq_amqp_python_client import ( Connection, Message, - QueueSpecification, - QueueType, + QuorumQueueSpecification, ) @@ -13,9 +12,7 @@ def test_bind_exchange_to_queue() -> None: queue_name = "test-queue" management = connection.management() - management.declare_queue( - QueueSpecification(name=queue_name, queue_type=QueueType.quorum) - ) + management.declare_queue(QuorumQueueSpecification(name=queue_name)) raised = False