Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Connection, Management and Publisher modules #10

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ jobs:
- name: poetry install
run: poetry install --no-root
- name: isort check-only
run: poetry run isort --check-only .
run: poetry run isort --skip rabbitmq_amqp_python_client/qpid --check-only .
- name: black check
run: poetry run black --check .
- name: flake8
run: poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503
run: poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
- name: mypy
run: |
poetry run mypy .
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
- name: poetry run pytest
run: poetry run pytest
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@

This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.

## How to Run
## How to Build the project and run the tests

- Start a RabbitMQ 4.x broker
- poetry build: build the source project
- poetry install: resolves and install dependencies
- poetry run pytest: run the tests

## Getting Started

An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:

poetry run python ./examples/getting_started/main.py


66 changes: 66 additions & 0 deletions examples/getting_started/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from rabbitmq_amqp_python_client import (
BindingSpecification,
Connection,
ExchangeSpecification,
Message,
QueueType,
StreamSpecification,
exchange_address,
)


def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
connection = Connection("amqp://guest:guest@localhost:5672/")

print("connection to amqp server")
connection.dial()

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
StreamSpecification(name=queue_name, queue_type=QueueType.stream)
)

print("binding queue to exchange")
bind_name = management.bind(
BindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)

addr = exchange_address(exchange_name, routing_key)

print("create a publisher and publish a test message")
publisher = connection.publisher(addr)

publisher.publish(Message(body="test"))

publisher.close()

print("unbind")
management.unbind(bind_name)

print("purging the queue")
management.purge_queue(queue_name)

print("delete queue")
management.delete_queue(queue_name)

print("delete exchange")
management.delete_exchange(exchange_name)

print("closing connections")
management.close()
connection.close()


if __name__ == "__main__":
main()
60 changes: 30 additions & 30 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from importlib import metadata

from .address_helper import exchange_address, queue_address
from .common import QueueType
from .connection import Connection
from .entities import (
BindingSpecification,
ExchangeSpecification,
)
from .management import Management
from .publisher import Publisher
from .qpid.proton._message import Message
from .queues import (
ClassicQueueSpecification,
QuorumQueueSpecification,
StreamSpecification,
)

try:
__version__ = metadata.version(__package__)
__license__ = metadata.metadata(__package__)["license"]
except metadata.PackageNotFoundError:
__version__ = "dev"
__license__ = None

del metadata

__all__ = [
"Connection",
"Management",
"ExchangeSpecification",
"QuorumQueueSpecification",
"ClassicQueueSpecification",
"StreamSpecification",
"BindingSpecification",
"QueueType",
"Publisher",
"exchange_address",
"queue_address",
"Message",
]
69 changes: 69 additions & 0 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from .entities import BindingSpecification


def is_unreserved(char: str) -> bool:
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
return char.isalnum() or char in "-._~"


def encode_path_segment(input_string: str) -> str:
encoded = []

# Iterate over each character in the input string
for char in input_string:
# Check if the character is an unreserved character
if is_unreserved(char):
encoded.append(char) # Append as is
else:
# Encode character to %HH format
encoded.append(f"%{ord(char):02X}")

return "".join(encoded)


def exchange_address(exchange_name: str, routing_key: str = "") -> str:
if routing_key == "":
path = "/exchanges/" + encode_path_segment(exchange_name)
else:
path = (
"/exchanges/"
+ encode_path_segment(exchange_name)
+ "/"
+ encode_path_segment(routing_key)
)

return path


def queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name)

return path


def purge_queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name) + "/messages"

return path


def path_address() -> str:
path = "/bindings"

return path


def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
+ "src="
+ encode_path_segment(bind_specification.source_exchange)
+ ";"
+ "dstq="
+ encode_path_segment(bind_specification.destination_queue)
+ ";key="
+ encode_path_segment(bind_specification.binding_key)
+ ";args="
)
return binding_path_wth_exchange_queue_key
32 changes: 32 additions & 0 deletions rabbitmq_amqp_python_client/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import enum


class CommonValues(enum.Enum):
response_code_200 = 200
response_code_201 = 201
response_code_204 = 204
response_code_404 = 404
response_code_409 = 409
command_put = "PUT"
command_get = "GET"
command_post = "POST"
command_delete = "DELETE"
command_reply_to = "$me"
management_node_address = "/management"
link_pair_name = "management-link-pair"
exchanges = "exchanges"
key = "key"
queue = "queues"
bindings = "bindings"


class ExchangeType(enum.Enum):
direct = "direct"
topic = "topic"
fanout = "fanout"


class QueueType(enum.Enum):
quorum = "quorum"
classic = "classic"
stream = "stream"
36 changes: 36 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging

from .management import Management
from .publisher import Publisher
from .qpid.proton.utils import BlockingConnection

logger = logging.getLogger(__name__)


class Connection:
def __init__(self, addr: str):
self._addr: str = addr
self._conn: BlockingConnection
self._management: Management

def dial(self) -> None:
logger.debug("Establishing a connection to the amqp server")
self._conn = BlockingConnection(self._addr)
self._open()
logger.debug("Connection to the server established")

def _open(self) -> None:
self._management = Management(self._conn)
self._management.open()

def management(self) -> Management:
return self._management

# closes the connection to the AMQP 1.0 server.
def close(self) -> None:
logger.debug("Closing connection")
self._conn.close()

def publisher(self, destination: str) -> Publisher:
publisher = Publisher(self._conn, destination)
return publisher
Loading