Skip to content

Commit

Permalink
implementing queue_info
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Jan 9, 2025
1 parent f8fd4c2 commit a031ce4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
14 changes: 14 additions & 0 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ class QueueSpecification:
is_durable: bool = True


@dataclass
class QueueInfo:
name: str
arguments: dict[str, str]
queue_type: QueueType = QueueType.quorum
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
is_durable: bool = True
leader: str = ""
members: str = ""
message_count: int = 0
consumer_count: int = 0


@dataclass
class BindingSpecification:
source_exchange: str
Expand Down
45 changes: 41 additions & 4 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
purge_queue_address,
queue_address,
)
from .common import CommonValues
from .common import CommonValues, QueueType
from .entities import (
BindingSpecification,
ExchangeSpecification,
QueueInfo,
QueueSpecification,
)
from .exceptions import ValidationCodeException
Expand Down Expand Up @@ -65,8 +66,10 @@ def request(
path: str,
method: str,
expected_response_codes: list[int],
) -> None:
self._request(str(uuid.uuid4()), body, path, method, expected_response_codes)
) -> Message:
return self._request(
str(uuid.uuid4()), body, path, method, expected_response_codes
)

def _request(
self,
Expand All @@ -75,7 +78,7 @@ def _request(
path: str,
method: str,
expected_response_codes: list[int],
) -> None:
) -> Message:
amq_message = Message(
id=id,
body=body,
Expand All @@ -93,6 +96,7 @@ def _request(
logger.debug("Received message: " + str(msg))

self._validate_reponse_code(int(msg.subject), expected_response_codes)
return msg

def declare_exchange(
self, exchange_specification: ExchangeSpecification
Expand Down Expand Up @@ -236,3 +240,36 @@ def purge_queue(self, queue_name: str) -> None:
CommonValues.response_code_200.value,
],
)

def queue_info(self, queue_name: str) -> QueueInfo:
logger.debug("queue_info operation called")
path = queue_address(queue_name)

message = self.request(
None,
path,
CommonValues.command_get.value,
[
CommonValues.response_code_200.value,
],
)

queue_info: dict[str, Any] = message.body

if queue_info["type"] == "quorum":
queue_type = QueueType.quorum
elif queue_info["type"] == "stream":
queue_type = QueueType.stream
else:
queue_type = QueueType.classic

return QueueInfo(
name=queue_info["name"],
is_durable=queue_info["durable"],
is_auto_delete=queue_info["auto_delete"],
is_exclusive=queue_info["exclusive"],
queue_type=queue_type,
leader=queue_info["leader"],
members=queue_info["replicas"],
arguments=queue_info["arguments"],
)
19 changes: 19 additions & 0 deletions tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,22 @@ def test_bind_exchange_to_queue() -> None:
management.delete_queue(queue_name)

management.unbind(binding_exchange_queue_path)


def test_queue_info() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-bind-exchange-to-queue-queue"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, arguments={}
)
management.declare_queue(queue_specification)

queue_info = management.queue_info(queue_name=queue_name)

assert queue_info.name == queue_name
assert queue_info.queue_type == queue_specification.queue_type
assert queue_info.is_durable == queue_specification.is_durable

0 comments on commit a031ce4

Please sign in to comment.