diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 100f933..a9ef64f 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -3,8 +3,9 @@ Connection, ExchangeSpecification, Message, - QueueSpecification, QueueType, + QuorumQueueSpecification, + StreamSpecification, exchange_address, ) @@ -24,7 +25,7 @@ def main() -> None: management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) management.declare_queue( - QueueSpecification(name=queue_name, queue_type=QueueType.quorum) + StreamSpecification(name=queue_name, queue_type=QueueType.stream) ) print("binding queue to exchange") diff --git a/poetry.lock b/poetry.lock index 9f9a18d..72ed139 100644 --- a/poetry.lock +++ b/poetry.lock @@ -458,5 +458,5 @@ files = [ [metadata] lock-version = "2.0" -python-versions = "^3.9" -content-hash = "bfe651ba3823b09c6d96c9e066187567a750a502a186c9a95955a6535a7134a1" +python-versions = "^3.10" +content-hash = "1d6eaec017e031690d3de190f4e59fbac5ad528724b05086f347758ec991b020" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index f8177c2..bf180cc 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,10 +6,14 @@ from .entities import ( BindingSpecification, ExchangeSpecification, - QueueSpecification, ) from .publisher import Publisher from .qpid.proton._message import Message +from .queues import ( + ClassicQueueSpecification, + QuorumQueueSpecification, + StreamSpecification, +) try: __version__ = metadata.version(__package__) @@ -23,7 +27,9 @@ __all__ = [ "Connection", "ExchangeSpecification", - "QueueSpecification", + "QuorumQueueSpecification", + "ClassicQueueSpecification", + "StreamSpecification", "BindingSpecification", "QueueType", "Publisher", diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index af70e62..bed44b1 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -14,23 +14,6 @@ class ExchangeSpecification: is_durable: bool = True -@dataclass -class QueueSpecification: - name: str - queue_type: QueueType = QueueType.quorum - dead_letter_routing_key: Optional[str] = None - is_exclusive: Optional[bool] = None - max_len: Optional[int] = None - max_len_bytes: Optional[int] = None - message_ttl: Optional[int] = None - expires: Optional[int] = None - dead_letter_exchange: Optional[str] = "" - is_auto_delete: bool = False - is_durable: bool = True - overflow: Optional[str] = None - single_active_consumer: Optional[bool] = None - - @dataclass class QueueInfo: name: str diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 1f40089..e7bd15b 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -14,7 +14,6 @@ BindingSpecification, ExchangeSpecification, QueueInfo, - QueueSpecification, ) from .exceptions import ValidationCodeException from .options import ReceiverOption, SenderOption @@ -24,6 +23,11 @@ BlockingReceiver, BlockingSender, ) +from .queues import ( + ClassicQueueSpecification, + QuorumQueueSpecification, + StreamSpecification, +) logger = logging.getLogger(__name__) @@ -125,9 +129,41 @@ def declare_exchange( return exchange_specification def declare_queue( - self, queue_specification: QueueSpecification - ) -> QueueSpecification: + self, + queue_specification: ( + ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification + ), + ) -> ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification: logger.debug("declare_queue operation called") + + if ( + type(queue_specification) is ClassicQueueSpecification + or type(queue_specification) is QuorumQueueSpecification + ): + body = self._declare_queue(queue_specification) + + elif type(queue_specification) is StreamSpecification: + body = self._declare_stream(queue_specification) + + path = queue_address(queue_specification.name) + + self.request( + body, + path, + CommonValues.command_put.value, + [ + CommonValues.response_code_200.value, + CommonValues.response_code_201.value, + CommonValues.response_code_409.value, + ], + ) + + return queue_specification + + def _declare_queue( + self, queue_specification: ClassicQueueSpecification | QuorumQueueSpecification + ) -> dict[str, Any]: + body = {} args: dict[str, Any] = {} @@ -155,22 +191,63 @@ def declare_queue( queue_specification.single_active_consumer ) + if type(queue_specification) is ClassicQueueSpecification: + if queue_specification.maximum_priority is not None: + args["x-maximum-priority"] = queue_specification.maximum_priority + + if type(queue_specification) is QuorumQueueSpecification: + if queue_specification.deliver_limit is not None: + args["x-deliver-limit"] = queue_specification.deliver_limit + + if queue_specification.dead_letter_strategy is not None: + args["x-dead-letter-strategy"] = ( + queue_specification.dead_letter_strategy + ) + + if queue_specification.quorum_initial_group_size is not None: + args["x-initial-quorum-group-size"] = ( + queue_specification.quorum_initial_group_size + ) + + if queue_specification.cluster_target_size is not None: + args["cluster_target_size"] = queue_specification.cluster_target_size + body["arguments"] = args # type: ignore - path = queue_address(queue_specification.name) + return body - self.request( - body, - path, - CommonValues.command_put.value, - [ - CommonValues.response_code_200.value, - CommonValues.response_code_201.value, - CommonValues.response_code_409.value, - ], - ) + def _declare_stream( + self, stream_specification: StreamSpecification + ) -> dict[str, Any]: - return queue_specification + body = {} + args: dict[str, Any] = {} + + args["x-queue-type"] = stream_specification.queue_type.value + + if stream_specification.max_len_bytes is not None: + args["x-max-length-bytes"] = stream_specification.max_len_bytes + + if stream_specification.max_time_retention is not None: + args["x-max-time-retention"] = stream_specification.max_time_retention + + if stream_specification.max_segment_size_in_bytes is not None: + args["x-max-segment-size-in-bytes"] = ( + stream_specification.max_segment_size_in_bytes + ) + + if stream_specification.filter_size is not None: + args["x-filter-size"] = stream_specification.filter_size + + if stream_specification.initial_group_size is not None: + args["x-initial-group-size"] = stream_specification.initial_group_size + + if stream_specification.leader_locator is not None: + args["x-leader-locator"] = stream_specification.leader_locator + + body["arguments"] = args + + return body def delete_exchange(self, exchange_name: str) -> None: logger.debug("delete_exchange operation called") diff --git a/rabbitmq_amqp_python_client/queues.py b/rabbitmq_amqp_python_client/queues.py new file mode 100644 index 0000000..2e030a9 --- /dev/null +++ b/rabbitmq_amqp_python_client/queues.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass +from typing import Optional + +from .common import QueueType + + +@dataclass +class QueueSpecification: + name: str + expires: Optional[int] = None + message_ttl: Optional[int] = None + overflow: Optional[str] = None + single_active_consumer: Optional[bool] = None + dead_letter_exchange: Optional[str] = None + dead_letter_routing_key: Optional[str] = None + max_len: Optional[int] = None + max_len_bytes: Optional[int] = None + leader_locator: Optional[str] = None + is_auto_delete: bool = False + is_durable: bool = True + + +@dataclass +class ClassicQueueSpecification(QueueSpecification): + queue_type: QueueType = QueueType.classic + maximum_priority: Optional[int] = None + + +@dataclass +class QuorumQueueSpecification(QueueSpecification): + queue_type: QueueType = QueueType.quorum + deliver_limit: Optional[str] = None + dead_letter_strategy: Optional[str] = None + quorum_initial_group_size: Optional[int] = None + cluster_target_size: Optional[int] = None + + +@dataclass +class StreamSpecification: + name: str + queue_type: QueueType = QueueType.stream + max_len_bytes: Optional[str] = None + max_time_retention: Optional[str] = None + max_segment_size_in_bytes: Optional[str] = None + filter_size: Optional[int] = None + initial_group_size: Optional[int] = None + leader_locator: Optional[str] = None diff --git a/setup.cfg b/setup.cfg index eddc97d..d6e2f9d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ exclude = .git, venv max-line-length = 120 [mypy] -python_version = 3.9 +python_version = 3.10 strict = True ignore_missing_imports = True