Skip to content

Commit

Permalink
improved arguments management during declare_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Jan 10, 2025
1 parent ca3c9e6 commit 5ceedca
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 34 deletions.
1 change: 0 additions & 1 deletion examples/getting_started/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
ExchangeSpecification,
Message,
QueueType,
QuorumQueueSpecification,
StreamSpecification,
exchange_address,
)
Expand Down
19 changes: 10 additions & 9 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import uuid
from typing import Any, Optional
from typing import Any, Optional, Union

from .address_helper import (
binding_path_with_exchange_queue,
Expand Down Expand Up @@ -130,19 +130,20 @@ def declare_exchange(

def declare_queue(
self,
queue_specification: (
ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification
),
) -> ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification:
queue_specification: Union[
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
],
) -> Union[
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
]:
logger.debug("declare_queue operation called")

if (
type(queue_specification) is ClassicQueueSpecification
or type(queue_specification) is QuorumQueueSpecification
if isinstance(queue_specification, ClassicQueueSpecification) or isinstance(
queue_specification, QuorumQueueSpecification
):
body = self._declare_queue(queue_specification)

elif type(queue_specification) is StreamSpecification:
elif isinstance(queue_specification, StreamSpecification):
body = self._declare_stream(queue_specification)

path = queue_address(queue_specification.name)
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class QuorumQueueSpecification(QueueSpecification):
class StreamSpecification:
name: str
queue_type: QueueType = QueueType.stream
max_len_bytes: Optional[str] = None
max_time_retention: Optional[str] = None
max_segment_size_in_bytes: Optional[str] = None
max_len_bytes: Optional[int] = None
max_time_retention: Optional[int] = None
max_segment_size_in_bytes: Optional[int] = None
filter_size: Optional[int] = None
initial_group_size: Optional[int] = None
leader_locator: Optional[str] = None
61 changes: 45 additions & 16 deletions tests/test_management.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from rabbitmq_amqp_python_client import (
BindingSpecification,
ClassicQueueSpecification,
Connection,
ExchangeSpecification,
QueueSpecification,
QueueType,
QuorumQueueSpecification,
StreamSpecification,
)
from rabbitmq_amqp_python_client.exceptions import (
ValidationCodeException,
Expand Down Expand Up @@ -35,9 +37,7 @@ def test_declare_purge_delete_queue() -> None:
queue_name = "my_queue"
management = connection.management()

queue_info = management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
queue_info = management.declare_queue(QuorumQueueSpecification(name=queue_name))

assert queue_info.name == queue_name

Expand All @@ -59,9 +59,7 @@ def test_bind_exchange_to_queue() -> None:

management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
management.declare_queue(QuorumQueueSpecification(name=queue_name))

binding_exchange_queue_path = management.bind(
BindingSpecification(
Expand Down Expand Up @@ -98,9 +96,8 @@ def test_queue_info_with_validations() -> None:
queue_name = "test_queue_info_with_validation"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
)
management.declare_queue(queue_specification)

Expand All @@ -122,16 +119,15 @@ def test_queue_precondition_fail() -> None:
queue_name = "test-queue_precondition_fail"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False
queue_specification = QuorumQueueSpecification(
name=queue_name, is_auto_delete=False
)
management.declare_queue(queue_specification)

management.declare_queue(queue_specification)

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
is_auto_delete=True,
)

Expand All @@ -152,7 +148,7 @@ def test_declare_classic_queue() -> None:
queue_name = "test-declare_classic_queue"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
Expand All @@ -165,14 +161,14 @@ def test_declare_classic_queue() -> None:
management.delete_queue(queue_name)


def test_declare_queue_with_args() -> None:
def test_declare_classic_queue_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-queue_with_args"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = ClassicQueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
Expand Down Expand Up @@ -201,3 +197,36 @@ def test_declare_queue_with_args() -> None:
)

management.delete_queue(queue_name)


def test_declare_stream_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

stream_name = "test-stream_with_args"
management = connection.management()

stream_specification = StreamSpecification(
name=stream_name,
max_len_bytes=1000000000,
max_time_retention=10000000,
max_segment_size_in_bytes=100000000,
filter_size=1000,
initial_group_size=3,
leader_locator="node1",
)

stream_info = management.declare_queue(stream_specification)

assert stream_specification.name == stream_info.name
assert stream_specification.max_len_bytes == stream_info.max_len_bytes
assert stream_specification.max_time_retention == stream_info.max_time_retention
assert (
stream_specification.max_segment_size_in_bytes
== stream_info.max_segment_size_in_bytes
)
assert stream_specification.filter_size == stream_info.filter_size
assert stream_specification.initial_group_size == stream_info.initial_group_size
assert stream_specification.leader_locator == stream_info.leader_locator

management.delete_queue(stream_name)
7 changes: 2 additions & 5 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from rabbitmq_amqp_python_client import (
Connection,
Message,
QueueSpecification,
QueueType,
QuorumQueueSpecification,
)


Expand All @@ -13,9 +12,7 @@ def test_bind_exchange_to_queue() -> None:
queue_name = "test-queue"
management = connection.management()

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
management.declare_queue(QuorumQueueSpecification(name=queue_name))

raised = False

Expand Down

0 comments on commit 5ceedca

Please sign in to comment.