From 97e37a4e5604009bb75eab4b65b6e48fe0626bcd Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Mon, 25 Sep 2023 22:49:18 +0530 Subject: [PATCH] Fix typos and grammar in Kafka and RabbitMQ articles in the docs (#736) * Fix typos and grammar in Kafka section articles * Fix typos and grammar in RabbitMQ section articles --------- Co-authored-by: Davor Runje --- .../getting-started/config/index.md | 2 +- .../kafka/Publisher/batch_publisher.md | 12 +++---- .faststream_gen/kafka/Publisher/index.md | 2 +- .../kafka/Publisher/using_a_key.md | 12 +++---- .../kafka/Subscriber/batch_subscriber.md | 8 ++--- .faststream_gen/kafka/Subscriber/index.md | 14 ++++---- .faststream_gen/kafka/index.md | 4 +-- .faststream_gen/kafka/message.md | 6 ++-- .faststream_gen/rabbit/ack.md | 14 ++++---- .faststream_gen/rabbit/declare.md | 4 +-- .faststream_gen/rabbit/examples/direct.md | 29 ++++++++-------- .faststream_gen/rabbit/examples/fanout.md | 18 +++++----- .faststream_gen/rabbit/examples/headers.md | 22 ++++++------ .faststream_gen/rabbit/examples/index.md | 8 ++--- .faststream_gen/rabbit/examples/stream.md | 2 +- .faststream_gen/rabbit/examples/topic.md | 16 ++++----- .faststream_gen/rabbit/index.md | 34 +++++++++---------- .faststream_gen/rabbit/message.md | 14 ++++---- .faststream_gen/rabbit/publishing.md | 17 +++++----- .faststream_gen/rabbit/rpc.md | 17 +++++----- docs/docs/en/getting-started/config/index.md | 2 +- .../en/kafka/Publisher/batch_publisher.md | 12 +++---- docs/docs/en/kafka/Publisher/index.md | 2 +- docs/docs/en/kafka/Publisher/using_a_key.md | 12 +++---- .../en/kafka/Subscriber/batch_subscriber.md | 8 ++--- docs/docs/en/kafka/Subscriber/index.md | 14 ++++---- docs/docs/en/kafka/index.md | 4 +-- docs/docs/en/kafka/message.md | 6 ++-- docs/docs/en/rabbit/ack.md | 14 ++++---- docs/docs/en/rabbit/declare.md | 4 +-- docs/docs/en/rabbit/examples/direct.md | 29 ++++++++-------- docs/docs/en/rabbit/examples/fanout.md | 18 +++++----- docs/docs/en/rabbit/examples/headers.md | 22 ++++++------ docs/docs/en/rabbit/examples/index.md | 8 ++--- docs/docs/en/rabbit/examples/stream.md | 2 +- docs/docs/en/rabbit/examples/topic.md | 16 ++++----- docs/docs/en/rabbit/index.md | 34 +++++++++---------- docs/docs/en/rabbit/message.md | 14 ++++---- docs/docs/en/rabbit/publishing.md | 17 +++++----- docs/docs/en/rabbit/rpc.md | 17 +++++----- 40 files changed, 252 insertions(+), 258 deletions(-) diff --git a/.faststream_gen/getting-started/config/index.md b/.faststream_gen/getting-started/config/index.md index 856528a71e..796dcfa3ec 100644 --- a/.faststream_gen/getting-started/config/index.md +++ b/.faststream_gen/getting-started/config/index.md @@ -1,6 +1,6 @@ # Settings and Environment Variables -n many cases, your application may require external settings or configurations, such as a broker connection or database credentials. +In many cases, your application may require external settings or configurations, such as a broker connection or database credentials. To manage these settings effectively, it's common to provide them through environment variables that can be read by the application. diff --git a/.faststream_gen/kafka/Publisher/batch_publisher.md b/.faststream_gen/kafka/Publisher/batch_publisher.md index 6ff7bd2061..5ef661abd5 100644 --- a/.faststream_gen/kafka/Publisher/batch_publisher.md +++ b/.faststream_gen/kafka/Publisher/batch_publisher.md @@ -1,17 +1,17 @@ -# Publishing in batches +# Publishing in Batches -If you want to send your data in batches `#!python @broker.publisher(...)` decorator makes that possible for you. -To produce in batches you need to do two things: +If you want to send your data in batches, `#!python @broker.publisher(...)` decorator makes that possible for you. +To produce in batches, you need to do two things: -1. When creating your publisher, set the `batch` argument to `True` +1. When creating your publisher, set the `batch` argument to `True`. 2. Return a tuple of the messages you wish to send in a batch. This action will prompt the producer to collect the messages and send them in a batch to a Kafka broker. Here is an example of an app producing in batches to **output_data** topic when consuming from **input_data_1**. In the highligted lines, we can see the steps of creating and using a batch publisher: -1. Creation of publisher -2. Publishing an actual batch of messages +1. Creation of the publisher. +2. Publishing an actual batch of messages. ```python linenums="1" hl_lines="19 26" from typing import Tuple diff --git a/.faststream_gen/kafka/Publisher/index.md b/.faststream_gen/kafka/Publisher/index.md index 72b80432f5..bad5a67139 100644 --- a/.faststream_gen/kafka/Publisher/index.md +++ b/.faststream_gen/kafka/Publisher/index.md @@ -1,5 +1,5 @@ # Publishing -**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, you can use them without any changes. +**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes. In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key. diff --git a/.faststream_gen/kafka/Publisher/using_a_key.md b/.faststream_gen/kafka/Publisher/using_a_key.md index aea9d63597..65ed285c19 100644 --- a/.faststream_gen/kafka/Publisher/using_a_key.md +++ b/.faststream_gen/kafka/Publisher/using_a_key.md @@ -1,20 +1,20 @@ -# Defining a partition key +# Defining a Partition Key Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance. -You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)`, this guide will demonstrate to you this feature. +You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)` decorator. This guide will demonstrate this feature to you. -## Calling `publish` with a key +## Calling `publish` with a Key -To publish a message to a Kafka topic using a key, simpliy pass the `key` parameter to the `publish` function call, like this: +To publish a message to a Kafka topic using a key, simply pass the `key` parameter to the `publish` function call, like this: ```python await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key") ``` -## App example +## App Example -Lest take a look at the whole app example that will consume from the **input_data** topic and publish with key to the **output_data** topic. +Let's take a look at the whole app example that will consume from the **input_data** topic and publish with a key to the **output_data** topic. You can see that the only difference from normal publishing is that now we pass the key to the publisher call. diff --git a/.faststream_gen/kafka/Subscriber/batch_subscriber.md b/.faststream_gen/kafka/Subscriber/batch_subscriber.md index ef6bb1b15e..e03a1a7947 100644 --- a/.faststream_gen/kafka/Subscriber/batch_subscriber.md +++ b/.faststream_gen/kafka/Subscriber/batch_subscriber.md @@ -1,12 +1,12 @@ # Batch Subscriber -If you want to consume data in batches `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed msg object as a list of messages and setting the `batch` parameter to `True` the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. +If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. -## Subscriber function with batching +## Subscriber Function with Batching -To consume messages in batches, you need to wrap you message type into a list and and set the `batch` parameter to `True`, the `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. +To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. -Here is an example of consuming in batches from **test_batch** topic: +Here is an example of consuming in batches from the **test_batch** topic: ```python linenums="1" @broker.subscriber("test_batch", batch=True) diff --git a/.faststream_gen/kafka/Subscriber/index.md b/.faststream_gen/kafka/Subscriber/index.md index dee62e13c3..bbeaf90122 100644 --- a/.faststream_gen/kafka/Subscriber/index.md +++ b/.faststream_gen/kafka/Subscriber/index.md @@ -1,8 +1,8 @@ # Basic Subscriber -To start consuming from a Kafka topic just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator passing a string as a topic key. +To start consuming from a Kafka topic, just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator, passing a string as a topic key. -In the folowing example we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic. +In the folowing example, we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic. The full app code looks like this: @@ -39,9 +39,9 @@ from faststream import FastStream, Logger from faststream.kafka import KafkaBroker ``` -## Define the HelloWorld message structure +## Define the HelloWorld Message Structure -Next, you need to define the structure of the messages you want to consume from the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project. +Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project. ```python linenums="1" class HelloWorld(BaseModel): @@ -61,7 +61,7 @@ broker = KafkaBroker("localhost:9092") app = FastStream(broker) ``` -## Create a function that will consume messages from a Kafka hello-world topic +## Create a Function that will Consume Messages from a Kafka hello-world Topic Let’s create a consumer function that will consume `HelloWorld` messages from **hello_world** topic and log them. @@ -73,6 +73,6 @@ async def on_hello_world(msg: HelloWorld, logger: Logger): The function decorated with the `#!python @broker.subscriber(...)` decorator will be called when a message is produced to Kafka. -The message will then be injected into the typed msg argument of the function and its type will be used to parse the message. +The message will then be injected into the typed `msg` argument of the function, and its type will be used to parse the message. -In this example case, when the message is sent into a **hello_world** topic, it will be parsed into a `HelloWorld` class and `on_hello_world` function will be called with the parsed class as msg argument value. +In this example case, when the message is sent to a **hello_world** topic, it will be parsed into a `HelloWorld` class, and the `on_hello_world` function will be called with the parsed class as the `msg` argument value. diff --git a/.faststream_gen/kafka/index.md b/.faststream_gen/kafka/index.md index 6142f48bcf..a879655d59 100644 --- a/.faststream_gen/kafka/index.md +++ b/.faststream_gen/kafka/index.md @@ -1,4 +1,4 @@ -# Kafka routing +# Kafka Routing ## Kafka Overview @@ -39,7 +39,7 @@ To connect to Kafka using the FastStream KafkaBroker module, follow these steps: 2. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic -3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. +3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module: diff --git a/.faststream_gen/kafka/message.md b/.faststream_gen/kafka/message.md index 02d900be25..182faa7cb6 100644 --- a/.faststream_gen/kafka/message.md +++ b/.faststream_gen/kafka/message.md @@ -34,12 +34,12 @@ async def base_handler( print(msg.headers) ``` -## Message Fields access +## Message Fields Access -But in the most cases you don't need all message fields, you need to know just a part of them. +In most cases, you don't need all message fields; you need to know just a part of them. You can use [Context Fields access](../getting-started/context/fields.md) feature for this. -For an example, you can get access to the `headers` like this: +For example, you can get access to the `headers` like this: ```python hl_lines="6" from faststream import Context diff --git a/.faststream_gen/rabbit/ack.md b/.faststream_gen/rabbit/ack.md index 58aac5385d..c0734da782 100644 --- a/.faststream_gen/rabbit/ack.md +++ b/.faststream_gen/rabbit/ack.md @@ -26,7 +26,7 @@ async def base_handler(body: str): ... ``` -If the `retry` flag is set to `int`, the message will be placed back in the queue and the number of retries will be limited to this number: +If the `retry` flag is set to an `int`, the message will be placed back in the queue, and the number of retries will be limited to this number: ```python @broker.subscriber("test", retry=3) # make up to 3 attempts @@ -39,11 +39,11 @@ async def base_handler(body: str): Subsequently, this logic will be reworked. !!! tip - For more complex error handling cases you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"} + For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"} -## Manual Ack +## Manual acknowledgement -If you want to *ack* message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method. +If you want to acknowledge a message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method. ```python from faststream.rabbit import RabbitMessage @@ -57,11 +57,11 @@ async def base_handler(body: str, msg: RabbitMessage): await msg.reject() ``` -**FastStream** will see that the message was already *ack*ed and will do nothing at process end. +**FastStream** will see that the message was already acknowledged and will do nothing at process end. ## Interrupt Process -If you want to interrupt message processing at any callstack, you can raise `faststream.exceptions.AckMessage` +If you want to interrupt message processing at any call stack, you can raise `faststream.exceptions.AckMessage` ``` python linenums="1" hl_lines="16" from faststream import FastStream @@ -87,4 +87,4 @@ async def test_publishing(): await broker.publish("Hello!", "test-queue") ``` -This way **FastStream** interrupts the current message proccessing and *ack* it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. +This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. diff --git a/.faststream_gen/rabbit/declare.md b/.faststream_gen/rabbit/declare.md index a25b3b188c..469e08aec4 100644 --- a/.faststream_gen/rabbit/declare.md +++ b/.faststream_gen/rabbit/declare.md @@ -34,7 +34,7 @@ async def declare_smth(): ) ``` -These methods require just a one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with. +These methods require just one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with. !!! tip - Also, these methods are indempotent, so you can call them with the same arguments multiple times, but the objects will be created once, next time the method will return already stored object. This way you can get access to any queue/exchange created automatically. + Also, these methods are idempotent, so you can call them with the same arguments multiple times, but the objects will be created once; next time the method will return an already stored object. This way you can get access to any queue/exchange created automatically. diff --git a/.faststream_gen/rabbit/examples/direct.md b/.faststream_gen/rabbit/examples/direct.md index f54fe07742..67e6cee398 100644 --- a/.faststream_gen/rabbit/examples/direct.md +++ b/.faststream_gen/rabbit/examples/direct.md @@ -1,21 +1,20 @@ # Direct Exchange -**Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: `exchange` sends messages to those queues, `routing_key` which matches the `routing_key` of the message being sent. +The **Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: the `exchange` sends messages to those queues whose `routing_key` matches the `routing_key` of the message being sent. !!! note - **Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default + The **Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default. ## Scaling -If several consumers are listening to the same queue, messages will go to the one of them (round-robin). This behavior is common for all types of `exchange`, because it refers to the queue itself. The type of `exchange` affects which queues the message gets into. +If several consumers are listening to the same queue, messages will be distributed to one of them (round-robin). This behavior is common for all types of `exchange` because it refers to the queue itself. The type of `exchange` affects which queues the message gets into. -Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed -of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: *RabbitMQ* will take care of how to distribute messages between your services. +Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: RabbitMQ will take care of how to distribute messages between your services. ## Example !!! tip - **Direct** Exchange is the type used in **FastStream** by default: you can simply declare it as follows + The **Direct** Exchange is the type used in **FastStream** by default. You can simply declare it as follows: ```python @broker.subscriber("test_queue", "test_exchange") @@ -23,7 +22,7 @@ of the message flow from the queue by launching additional instances of a consum ... ``` -The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs +The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs. ```python linenums="1" from faststream import FastStream, Logger @@ -72,7 +71,7 @@ queue_1 = RabbitQueue("test-q-1", auto_delete=True) queue_2 = RabbitQueue("test-q-2", auto_delete=True) ``` -Then we sign up several consumers using the advertised queues to the `exchange` we created +Then we sign up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" @broker.subscriber(queue_1, exch) @@ -92,18 +91,18 @@ async def base_handler3(logger: Logger): !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution -Now the distribution of messages between these consumers will look like this: +Now, the distribution of messages between these consumers will look like this: ```python linenums="30" await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1 ``` -Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `test-q-1` +Message `1` will be sent to `handler1` because it listens to the `exchange` using a queue with the routing key `test-q-1`. --- @@ -111,7 +110,7 @@ Message `1` will be sent to `handler1` because it listens to `exchange` using a await broker.publish(queue="test-q-1", exchange=exch) # handlers: 2 ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to the `exchange` using the same queue, but `handler1` is busy. --- @@ -119,7 +118,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1 ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -127,4 +126,4 @@ Message `3` will be sent to `handler1` again, because it is currently free await broker.publish(queue="test-q-2", exchange=exch) # handlers: 3 ``` -Message `4` will be sent to `handler3`, because it is the only one listening to `exchange` using a queue with the routing key `test-q-2` +Message `4` will be sent to `handler3` because it is the only one listening to the `exchange` using a queue with the routing key `test-q-2`. diff --git a/.faststream_gen/rabbit/examples/fanout.md b/.faststream_gen/rabbit/examples/fanout.md index 7dc0a81e34..a003ec8beb 100644 --- a/.faststream_gen/rabbit/examples/fanout.md +++ b/.faststream_gen/rabbit/examples/fanout.md @@ -1,6 +1,6 @@ # Fanout Exchange -**Fanout** Exchange is an even simpler, but slightly less popular way of routing in *RabbitMQ*. This type of `exchange` sends messages to all queues subscribed to it, ignoring any arguments of the message. +The **Fanout** Exchange is an even simpler, but slightly less popular way of routing in *RabbitMQ*. This type of `exchange` sends messages to all queues subscribed to it, ignoring any arguments of the message. At the same time, if the queue listens to several consumers, messages will also be distributed among them. @@ -53,7 +53,7 @@ queue_1 = RabbitQueue("test-q-1", auto_delete=True) queue_2 = RabbitQueue("test-q-2", auto_delete=True) ``` -Then we signed up several consumers using the advertised queues to the `exchange` we created +Then we signed up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" @broker.subscriber(queue_1, exch) @@ -73,10 +73,10 @@ async def base_handler3(logger: Logger): !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -84,7 +84,7 @@ Now the distribution of messages between these consumers will look like this: await broker.publish(exchange=exch) # handlers: 1, 3 ``` -Message `1` will be sent to `handler1` and `handler3`, because they listen to `exchange` using different queues +Message `1` will be sent to `handler1` and `handler3` because they listen to `exchange` using different queues. --- @@ -92,7 +92,7 @@ Message `1` will be sent to `handler1` and `handler3`, because they listen to `e await broker.publish(exchange=exch) # handlers: 2, 3 ``` -Message `2` will be sent to `handler2` and `handler3`, because `handler2` listens to `exchange` using the same queue as `handler1` +Message `2` will be sent to `handler2` and `handler3` because `handler2` listens to `exchange` using the same queue as `handler1`. --- @@ -100,7 +100,7 @@ Message `2` will be sent to `handler2` and `handler3`, because `handler2` listen await broker.publish(exchange=exch) # handlers: 1, 3 ``` -Message `3` will be sent to `handler1` and `handler3` +Message `3` will be sent to `handler1` and `handler3`. --- @@ -108,9 +108,9 @@ Message `3` will be sent to `handler1` and `handler3` await broker.publish(exchange=exch) # handlers: 2, 3 ``` -Message `4` will be sent to `handler3` and `handler3` +Message `4` will be sent to `handler2` and `handler3`. --- !!! note - When sending messages to **Fanout** exchange, it makes no sense to specify the arguments `queue` or `routing_key`, because they will be ignored + When sending messages to **Fanout** exchange, it makes no sense to specify the arguments `queue` or `routing_key`, because they will be ignored. diff --git a/.faststream_gen/rabbit/examples/headers.md b/.faststream_gen/rabbit/examples/headers.md index fc7c9afbda..e44f88033d 100644 --- a/.faststream_gen/rabbit/examples/headers.md +++ b/.faststream_gen/rabbit/examples/headers.md @@ -1,6 +1,6 @@ # Header Exchange -**Header** Exchange is the most complex and flexible way to route messages in *RabbitMQ*. This `exchange` type sends messages to queues according by matching the queue binding arguments with message headers. +The **Header** Exchange is the most complex and flexible way to route messages in *RabbitMQ*. This `exchange` type sends messages to queues according by matching the queue binding arguments with message headers. At the same time, if several consumers are subscribed to the queue, messages will also be distributed among them. @@ -66,7 +66,7 @@ async def send_messages(): ### Consumer Announcement -First, we announce our **Fanout** exchange and several queues that will listen to it: +First, we announce our **Header** exchange and several queues that will listen to it: ```python linenums="7" hl_lines="1 6 11 16" exch = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS) @@ -90,7 +90,7 @@ queue_3 = RabbitQueue( The `x-match` argument indicates whether the arguments should match the message headers in whole or in part. -Then we signed up several consumers using the advertised queues to the `exchange` we created +Then we signed up several consumers using the advertised queues to the `exchange` we created: ```python linenums="26" hl_lines="1 6 11 16" @broker.subscriber(queue_1, exch) @@ -115,10 +115,10 @@ async def base_handler4(logger: Logger): !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -126,7 +126,7 @@ Now the distribution of messages between these consumers will look like this: await broker.publish(exchange=exch, headers={"key": 1}) # handlers: 1 ``` -Message `1` will be sent to `handler1`, because it listens to a queue whose `key` header matches the `key` header of the message +Message `1` will be sent to `handler1` because it listens to a queue whose `key` header matches the `key` header of the message. --- @@ -134,7 +134,7 @@ Message `1` will be sent to `handler1`, because it listens to a queue whose `key await broker.publish(exchange=exch, headers={"key": 1}) # handlers: 2 ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy. --- @@ -142,7 +142,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th await broker.publish(exchange=exch, headers={"key": 1}) # handlers: 1 ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -150,7 +150,7 @@ Message `3` will be sent to `handler1` again, because it is currently free await broker.publish(exchange=exch, headers={"key": 2}) # handlers: 3 ``` -Message `4` will be sent to `handler3`, because it listens to a queue whose `key` header coincided with the `key` header of the message +Message `4` will be sent to `handler3` because it listens to a queue whose `key` header coincided with the `key` header of the message. --- @@ -158,7 +158,7 @@ Message `4` will be sent to `handler3`, because it listens to a queue whose `key await broker.publish(exchange=exch, headers={"key2": 2}) # handlers: 3 ``` -Message `5` will be sent to `handler3`, because it listens to a queue whose header `key2` coincided with the header `key2` of the message +Message `5` will be sent to `handler3` because it listens to a queue whose header `key2` coincided with the header `key2` of the message. --- @@ -168,7 +168,7 @@ Message `5` will be sent to `handler3`, because it listens to a queue whose head ) # handlers: 3, 4 ``` -Message `6` will be sent to `handler3` and `handler4`, because the message headers completely match the queue keys +Message `6` will be sent to `handler3` and `handler4` because the message headers completely match the queue keys. --- diff --git a/.faststream_gen/rabbit/examples/index.md b/.faststream_gen/rabbit/examples/index.md index 3ffd3f8782..38608b5943 100644 --- a/.faststream_gen/rabbit/examples/index.md +++ b/.faststream_gen/rabbit/examples/index.md @@ -1,8 +1,8 @@ # Basic Subscriber -If you know nothing about **RabbitMQ** and how it works you will still able to use **FastStream RabbitBroker**. +If you know nothing about **RabbitMQ** and how it works, you will still able to use **FastStream RabbitBroker**. -Just use `#!python @broker.subscriber(...)` method with a string as a routing key. +Just use the `#!python @broker.subscriber(...)` method with a string as a routing key. ```python linenums="1" from faststream import FastStream @@ -25,9 +25,9 @@ async def test_publish(): ) ``` -This is the principle all **FastStream** brokers work by: you don't need to learn them in-depth if you want to *just send a message* +This is the principle all **FastStream** brokers work by: you don't need to learn them in-depth if you want to *just send a message*. -## RabbitMQ details +## RabbitMQ Details If you are already familiar with **RabbitMQ** logic, you should also be acquainted with the inner workings of the example mentioned above. diff --git a/.faststream_gen/rabbit/examples/stream.md b/.faststream_gen/rabbit/examples/stream.md index cb4f8b0549..a7b55ba24c 100644 --- a/.faststream_gen/rabbit/examples/stream.md +++ b/.faststream_gen/rabbit/examples/stream.md @@ -1,6 +1,6 @@ # RabbitMQ Streams -*RabbitMQ* has a [Streams](https://www.rabbitmq.com/streams.html){.exteranl-link target="_blank"} feature, which logic is closely related to *Kafka* topics. +*RabbitMQ* has a [Streams](https://www.rabbitmq.com/streams.html){.exteranl-link target="_blank"} feature, which is closely related to *Kafka* topics. The main difference from regular *RabbitMQ* queues is that the messages are not deleted after consuming. diff --git a/.faststream_gen/rabbit/examples/topic.md b/.faststream_gen/rabbit/examples/topic.md index 93866ec6e5..db3846a712 100644 --- a/.faststream_gen/rabbit/examples/topic.md +++ b/.faststream_gen/rabbit/examples/topic.md @@ -1,6 +1,6 @@ # Topic Exchange -**Topic** Exchange is a powerful *RabbitMQ* routing tool. This type of `exchange` sends messages to the queue in accordance with the *pattern* specified when they are connected to `exchange` and the `routing_key` of the message itself. +The **Topic** Exchange is a powerful *RabbitMQ* routing tool. This type of `exchange` sends messages to the queue in accordance with the *pattern* specified when they are connected to `exchange` and the `routing_key` of the message itself. At the same time, if several consumers are subscribed to the queue, messages will be distributed among them. @@ -55,7 +55,7 @@ queue_2 = RabbitQueue("test-queue-2", auto_delete=True, routing_key="*.debug") At the same time, in the `routing_key` of our queues, we specify the *pattern* of routing keys that will be processed by this queue. -Then we sign up several consumers using the advertised queues to the `exchange` we created +Then we sign up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" @broker.subscriber(queue_1, exch) @@ -75,10 +75,10 @@ async def base_handler3(logger: Logger): !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -86,7 +86,7 @@ Now the distribution of messages between these consumers will look like this: await broker.publish(routing_key="logs.info", exchange=exch) # handlers: 1 ``` -Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `*.info` +Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `*.info`. --- @@ -94,7 +94,7 @@ Message `1` will be sent to `handler1` because it listens to `exchange` using a await broker.publish(routing_key="logs.info", exchange=exch) # handlers: 2 ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy. --- @@ -102,7 +102,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th await broker.publish(routing_key="logs.info", exchange=exch) # handlers: 1 ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -110,4 +110,4 @@ Message `3` will be sent to `handler1` again, because it is currently free await broker.publish(routing_key="logs.debug", exchange=exch) # handlers: 3 ``` -Message `4` will be sent to `handler3`, because it is the only one listening to `exchange` using a queue with the routing key `*.debug` +Message `4` will be sent to `handler3` because it is the only one listening to `exchange` using a queue with the routing key `*.debug`. diff --git a/.faststream_gen/rabbit/index.md b/.faststream_gen/rabbit/index.md index d7c6c5905a..cceec68b4a 100644 --- a/.faststream_gen/rabbit/index.md +++ b/.faststream_gen/rabbit/index.md @@ -13,9 +13,9 @@ It supports the ability to successfully process messages, mark them as processed Having to keep track of the current status of all messages is a cause of the **RabbitMQ** performance issues. With really large message volumes, **RabbitMQ** starts to degrade. However, if this was a "one-time influx", then consumers will free the queue of messages and the "health" of **RabbitMQ** will be stable. -If your scenario is not based on processing millions of messages, and also requires building complex routing logic - **RabbitMQ** you will be right choice. +If your scenario is not based on processing millions of messages and also requires building complex routing logic, **RabbitMQ** will be the right choice. -## Basic concepts +## Basic Concepts If you want to totally understand how *RabbitMQ* works, you should visit their official website. There you will find top-level comments about the basic concepts and usage examples. @@ -27,40 +27,40 @@ If you want to totally understand how *RabbitMQ* works, you should visit their o * `Queue` - the point of pushing messages to *consumer* * `Binding` - the relationship between *queue-exchange* or *exchange-exchange* -### Routing rules +### Routing Rules The rules for delivering messages to consumers depend on the **type of exchange** and **binding** parameters. All the main options will be discussed at [examples](examples/direct.md){.internal-link}. In general, the message path looks so: -1. *Publisher* sends a message to `exchange`, specify its `routing_key` and headers according to which routing will take place -2. `exchange`, depending on the message parameters, determines which of the subscribed `bindings` to send the message to -3. `binding` delivers the message to `queue` or another `exchange` (in this case it will send it further by its own rules) -4. `queue`, after receiving a message, sends it to one of subscribed consumers (**PUSH API**) +1. *Publisher* sends a message to `exchange`, specify its `routing_key` and headers according to which routing will take place. +2. `Exchange`, depending on the message parameters, determines which of the subscribed `bindings` to send the message to. +3. `Binding` delivers the message to `queue` or another `exchange` (in this case it will send it further by its own rules). +4. `Queue`, after receiving a message, sends it to one of subscribed consumers (**PUSH API**). !!! tip - By default, all queues have `binding` to `default exchange` (**Direct** type) with **routing key** corresponding to their name. - In **FastStream**, queues are connected to this `exchange` and messages are sent by default unless another `exchange` is explicitly specified. + By default, all queues have a `binding` to the `default exchange` (**Direct** type) with a **routing key** corresponding to their name. + In **FastStream**, queues are connected to this `exchange`, and messages are sent by default unless another `exchange` is explicitly specified. !!! warning "" Connecting the queue to any other `exchange` will still leave it subscribed to the `default exchange'. Be careful with this. At this stage, the message gets into your application - and you start processing it. -### Message statuses +### Message Statuses -*RabbitMQ* requires confirmation of message processing: only after that it will be removed from the queue. +*RabbitMQ* requires confirmation of message processing: only after that, it will be removed from the queue. -Confirmation can be either positive (`Acknowledgment - ack`) if the message was successfully processed, or negative (`Negative Acknowledgment - nack`) if the message was processed with an error. +Confirmation can be either positive (`Acknowledgment - ack`) if the message was successfully processed or negative (`Negative Acknowledgment - nack`) if the message was processed with an error. -At the same time, in case of an error, the message can also be extracted from the queue (`reject`), otherwise, after a negative confirmation, it will be requeued for processing again. +At the same time, in case of an error, the message can also be extracted from the queue (`reject`); otherwise, after a negative confirmation, it will be requeued for processing again. -In most cases, **FastStream** performs all the necessary actions by itself: however, if you want to manage the message lifecycle directly, you can access the message object itself and call the appropriate methods directly. This can be useful if you want to implement an "at most once" policy and you need to confirm the consuming of the message before it is actually processed. +In most cases, **FastStream** performs all the necessary actions by itself. However, if you want to manage the message lifecycle directly, you can access the message object itself and call the appropriate methods directly. This can be useful if you want to implement an "at most once" policy and you need to confirm the consuming of the message before it is actually processed. -## **FastStream** specific +## **FastStream** Specific -**FastStream** omits the ability to create `bindings` directly, since in most cases you do not need to subscribe one queue to several `exchanges` or subscribe `exchanges` to each other. On the contrary, this practice leads to over-complication of the message routing scheme, which makes it difficult to maintain and further develop the entire infrastructure of services. +**FastStream** omits the ability to create `bindings` directly, since in most cases, you do not need to subscribe one queue to several `exchanges` or subscribe `exchanges` to each other. On the contrary, this practice leads to over-complication of the message routing scheme, which makes it difficult to maintain and further develop the entire infrastructure of services. **FastStream** suggests you adhere to the scheme `exchange:queue` as `1:N`, which will greatly simplify the scheme of interaction between your services. It is better to create an additional queue for a new `exchange` than to subscribe to an existing one. -However, if you want to reduce the number of entities in your RabbitMQ, and thereby optimize its performance (or you know exactly what you are doing), **FastStream** leaves you the option to create `bindings` directly. In other cases, the connection parameters are an integral part of the entities *RabbitQueue* and *RabbitExchange* in **FastStream**. +However, if you want to reduce the number of entities in your **RabbitMQ**, and thereby optimize its performance (or you know exactly what you are doing), **FastStream** leaves you the option to create `bindings` directly. In other cases, the connection parameters are an integral part of the entities **RabbitQueue** and **RabbitExchange** in **FastStream**. diff --git a/.faststream_gen/rabbit/message.md b/.faststream_gen/rabbit/message.md index 55c1f62a4b..a3c1afc9c4 100644 --- a/.faststream_gen/rabbit/message.md +++ b/.faststream_gen/rabbit/message.md @@ -1,8 +1,8 @@ -# Access to Message information +# Access to Message Information -As you know, **FastStream** serializes a message body and provides you access to it by function arguments. But sometimes you want access to a message_id, headers or other meta information. +As you know, **FastStream** serializes a message body and provides you access to it through function arguments. But sometimes you want access to a message_id, headers, or other meta-information. -## Message access +## Message Access You can get it in a simple way: just acces to the message object in the [Context](../getting-started/context/existed.md){.internal-link}! @@ -27,7 +27,7 @@ async def base_handler( print(msg.correlation_id) ``` -Also, if you can't find the information you reqiure, you can get access right to the wrapped `aio_pika.IncomingMessage` which contains complete message information. +Also, if you can't find the information you reqiure, you can get access directly to the wrapped `aio_pika.IncomingMessage`, which contains complete message information. ```python hl_lines="6" from aio_pika import IncomingMessage @@ -39,9 +39,9 @@ async def base_handler(body: str, msg: RabbitMessage): print(raw) ``` -## Message Fields access +## Message Fields Access -But in the most cases you don't need all message fields, you need to access some of them. You can use [Context Fields access](../getting-started/context/fields.md){.internal-link} feature for this reason. +But in the most cases, you don't need all message fields; you need to access some of them. You can use [Context Fields access](../getting-started/context/fields.md){.internal-link} feature for this reason. For example, you can get access to the `correlation_id` like this: @@ -69,7 +69,7 @@ async def base_handler( print(cor_id) ``` -But this code is a too long to be reused everywhere.In this case, you can use a python [`Annotated`](https://docs.python.org/3/library/typing.html#typing.Annotated){.external-link target="_blank"} feature: +But this code is too long to be reused everywhere. In this case, you can use a Python [`Annotated`](https://docs.python.org/3/library/typing.html#typing.Annotated){.external-link target="_blank"} feature: === "python 3.9+" diff --git a/.faststream_gen/rabbit/publishing.md b/.faststream_gen/rabbit/publishing.md index 78471c1b1a..6c56ef8d46 100644 --- a/.faststream_gen/rabbit/publishing.md +++ b/.faststream_gen/rabbit/publishing.md @@ -1,8 +1,8 @@ # Publishing -**FastStream** RabbitBroker supports all regular [publishing usecases](../getting-started/publishing/index.md){.internal-link}, you can use them without any changes. +**FastStream** RabbitBroker supports all regular [publishing usecases](../getting-started/publishing/index.md){.internal-link}. you can use them without any changes. -However, if you whish to further customize the publishing logic further, you should take a more deep-dive look at specific RabbitBroker parameters. +However, if you wish to further customize the publishing logic further, you should take a more deep-dive look at specific RabbitBroker parameters. ## Rabbit Publishing @@ -29,8 +29,7 @@ asyncio.run(pub()) If you don't specify any exchange, the message will be send to the default one. -Also, you able to use special **RabbitQueue** and **RabbitExchange** objects as a `queue` and `exchange` arguments: - +Also, you are able to use special **RabbitQueue** and **RabbitExchange** objects as `queue` and `exchange` arguments: ``` python from faststream.rabbit import RabbitExchange, RabbitQueue @@ -45,11 +44,11 @@ await broker.publish( If you specify exchange that doesn't exist, RabbitBroker will create a required one and then publish a message to it. !!! tip - Be accurate with it: if you have already created an **Exchange** with specific parameters and try to send a message by exchange name to it, the broker will try to create it. So, **Exchange** parameters conflict will occure. + Be accurate with it: if you have already created an **Exchange** with specific parameters and try to send a message by exchange name to it, the broker will try to create it. So, **Exchange** parameters conflict will occur. - If you are trying to send a message to specific **Exchange** - sending it with a defined **RabbitExchange** object is the preffered way. + If you are trying to send a message to a specific **Exchange**, sending it with a defined **RabbitExchange** object is the preffered way. -## Basic arguments +## Basic Arguments The `publish` method takes the following arguments: @@ -58,7 +57,7 @@ The `publish` method takes the following arguments: * `#!python queue: str | RabbitQueue = ""` - the queue where the message will be sent (since most queues use their name as the routing key, this is a human-readable version of `routing_key`) * `#!python routing_key: str = ""` - also a message routing key, if not specified, the `queue` argument will be used -## Message parameters +## Message Parameters You can read more about all the available flags in the [RabbitMQ documentation](https://www.rabbitmq.com/consumers.html){.external-link target="_blank"} @@ -75,7 +74,7 @@ You can read more about all the available flags in the [RabbitMQ documentation]( * `#!python user_id: str | None = None` - ID of the *RabbitMQ* user who sent the message * `#!python app_id: str | None = None` - ID of the application that sent the message (used by consumers) -## Send flags +## Send Flags Arguments for sending a message: diff --git a/.faststream_gen/rabbit/rpc.md b/.faststream_gen/rabbit/rpc.md index b86c9d7ff2..3a71a5ec72 100644 --- a/.faststream_gen/rabbit/rpc.md +++ b/.faststream_gen/rabbit/rpc.md @@ -1,14 +1,14 @@ # RPC over RMQ -## Blocking request +## Blocking Request -**FastStream** provides you with the ability to send blocking RPC request over *RabbitMQ* in a very simple way. +**FastStream** provides you with the ability to send a blocking RPC request over *RabbitMQ* in a very simple way. It uses the [**Direct Reply-To**](https://www.rabbitmq.com/direct-reply-to.html){.external-link target="_blank"} *RabbitMQ* feature, so you don't need to create any queues to consume a response. Just send a message like a regular one and get a response synchronously. -It is a very close to common **requests** syntax: +It is very close to common **requests** syntax: ``` python hl_lines="1 4" msg = await broker.publish( @@ -18,17 +18,16 @@ msg = await broker.publish( ) ``` -Also, you have a two extra options to control this behavior: +Also, you have two extra options to control this behavior: -* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for response -* `#!python raise_timeout: bool = False` - by default timeout request returns `None`, but if you need to raise TimeoutException directly, you can specify this option +* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response +* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a TimeoutException directly, you can specify this option ## Reply-To -Also, if you want to create permanent request-reply data flow, probably, you should create a permanent queue to consume responses. - -So, if you have a such one, you can specify it with `reply_to` argument. This way **FastStream** will send a response in this queue automatically. +Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent queue to consume responses. +So, if you have such one, you can specify it with the `reply_to` argument. This way, **FastStream** will send a response to this queue automatically. ``` python hl_lines="1 8" @broker.subscriber("response-queue") diff --git a/docs/docs/en/getting-started/config/index.md b/docs/docs/en/getting-started/config/index.md index 4f04091a8f..04107acb36 100644 --- a/docs/docs/en/getting-started/config/index.md +++ b/docs/docs/en/getting-started/config/index.md @@ -1,6 +1,6 @@ # Settings and Environment Variables -n many cases, your application may require external settings or configurations, such as a broker connection or database credentials. +In many cases, your application may require external settings or configurations, such as a broker connection or database credentials. To manage these settings effectively, it's common to provide them through environment variables that can be read by the application. diff --git a/docs/docs/en/kafka/Publisher/batch_publisher.md b/docs/docs/en/kafka/Publisher/batch_publisher.md index da0e0bde87..329c4ab4b4 100644 --- a/docs/docs/en/kafka/Publisher/batch_publisher.md +++ b/docs/docs/en/kafka/Publisher/batch_publisher.md @@ -1,17 +1,17 @@ -# Publishing in batches +# Publishing in Batches -If you want to send your data in batches `#!python @broker.publisher(...)` decorator makes that possible for you. -To produce in batches you need to do two things: +If you want to send your data in batches, `#!python @broker.publisher(...)` decorator makes that possible for you. +To produce in batches, you need to do two things: -1. When creating your publisher, set the `batch` argument to `True` +1. When creating your publisher, set the `batch` argument to `True`. 2. Return a tuple of the messages you wish to send in a batch. This action will prompt the producer to collect the messages and send them in a batch to a Kafka broker. Here is an example of an app producing in batches to **output_data** topic when consuming from **input_data_1**. In the highligted lines, we can see the steps of creating and using a batch publisher: -1. Creation of publisher -2. Publishing an actual batch of messages +1. Creation of the publisher. +2. Publishing an actual batch of messages. ```python linenums="1" hl_lines="19 26" {!> docs_src/kafka/publish_batch/app.py [ln:1-26] !} diff --git a/docs/docs/en/kafka/Publisher/index.md b/docs/docs/en/kafka/Publisher/index.md index 72b80432f5..bad5a67139 100644 --- a/docs/docs/en/kafka/Publisher/index.md +++ b/docs/docs/en/kafka/Publisher/index.md @@ -1,5 +1,5 @@ # Publishing -**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, you can use them without any changes. +**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes. In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key. diff --git a/docs/docs/en/kafka/Publisher/using_a_key.md b/docs/docs/en/kafka/Publisher/using_a_key.md index 5d0a438cbd..7665be371c 100644 --- a/docs/docs/en/kafka/Publisher/using_a_key.md +++ b/docs/docs/en/kafka/Publisher/using_a_key.md @@ -1,20 +1,20 @@ -# Defining a partition key +# Defining a Partition Key Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance. -You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)`, this guide will demonstrate to you this feature. +You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)` decorator. This guide will demonstrate this feature to you. -## Calling `publish` with a key +## Calling `publish` with a Key -To publish a message to a Kafka topic using a key, simpliy pass the `key` parameter to the `publish` function call, like this: +To publish a message to a Kafka topic using a key, simply pass the `key` parameter to the `publish` function call, like this: ```python {!> docs_src/kafka/publish_with_partition_key/app.py [ln:25] !} ``` -## App example +## App Example -Lest take a look at the whole app example that will consume from the **input_data** topic and publish with key to the **output_data** topic. +Let's take a look at the whole app example that will consume from the **input_data** topic and publish with a key to the **output_data** topic. You can see that the only difference from normal publishing is that now we pass the key to the publisher call. diff --git a/docs/docs/en/kafka/Subscriber/batch_subscriber.md b/docs/docs/en/kafka/Subscriber/batch_subscriber.md index 48df819192..67cb11779f 100644 --- a/docs/docs/en/kafka/Subscriber/batch_subscriber.md +++ b/docs/docs/en/kafka/Subscriber/batch_subscriber.md @@ -1,12 +1,12 @@ # Batch Subscriber -If you want to consume data in batches `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed msg object as a list of messages and setting the `batch` parameter to `True` the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. +If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. -## Subscriber function with batching +## Subscriber Function with Batching -To consume messages in batches, you need to wrap you message type into a list and and set the `batch` parameter to `True`, the `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. +To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. -Here is an example of consuming in batches from **test_batch** topic: +Here is an example of consuming in batches from the **test_batch** topic: ```python linenums="1" {!> docs_src/kafka/batch_consuming_pydantic/app.py [ln:20-22] !} diff --git a/docs/docs/en/kafka/Subscriber/index.md b/docs/docs/en/kafka/Subscriber/index.md index d4bbd35519..972814cbc4 100644 --- a/docs/docs/en/kafka/Subscriber/index.md +++ b/docs/docs/en/kafka/Subscriber/index.md @@ -1,8 +1,8 @@ # Basic Subscriber -To start consuming from a Kafka topic just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator passing a string as a topic key. +To start consuming from a Kafka topic, just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator, passing a string as a topic key. -In the folowing example we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic. +In the folowing example, we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic. The full app code looks like this: @@ -18,9 +18,9 @@ To use the `#!python @broker.subscriber(...)` decorator, first we need to import {!> docs_src/kafka/consumes_basics/app.py [ln:3-4] !} ``` -## Define the HelloWorld message structure +## Define the HelloWorld Message Structure -Next, you need to define the structure of the messages you want to consume from the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project. +Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project. ```python linenums="1" {!> docs_src/kafka/consumes_basics/app.py [ln:7-12] !} @@ -34,7 +34,7 @@ Next, we will create a `KafkaBroker` object and wrap it into the `FastStream` ob {!> docs_src/kafka/consumes_basics/app.py [ln:15-16] !} ``` -## Create a function that will consume messages from a Kafka hello-world topic +## Create a Function that will Consume Messages from a Kafka hello-world Topic Let’s create a consumer function that will consume `HelloWorld` messages from **hello_world** topic and log them. @@ -44,6 +44,6 @@ Let’s create a consumer function that will consume `HelloWorld` messages from The function decorated with the `#!python @broker.subscriber(...)` decorator will be called when a message is produced to Kafka. -The message will then be injected into the typed msg argument of the function and its type will be used to parse the message. +The message will then be injected into the typed `msg` argument of the function, and its type will be used to parse the message. -In this example case, when the message is sent into a **hello_world** topic, it will be parsed into a `HelloWorld` class and `on_hello_world` function will be called with the parsed class as msg argument value. +In this example case, when the message is sent to a **hello_world** topic, it will be parsed into a `HelloWorld` class, and the `on_hello_world` function will be called with the parsed class as the `msg` argument value. diff --git a/docs/docs/en/kafka/index.md b/docs/docs/en/kafka/index.md index 9f452eaab6..ba1089b192 100644 --- a/docs/docs/en/kafka/index.md +++ b/docs/docs/en/kafka/index.md @@ -1,4 +1,4 @@ -# Kafka routing +# Kafka Routing ## Kafka Overview @@ -39,7 +39,7 @@ To connect to Kafka using the FastStream KafkaBroker module, follow these steps: 2. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic -3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. +3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module: diff --git a/docs/docs/en/kafka/message.md b/docs/docs/en/kafka/message.md index 02d900be25..182faa7cb6 100644 --- a/docs/docs/en/kafka/message.md +++ b/docs/docs/en/kafka/message.md @@ -34,12 +34,12 @@ async def base_handler( print(msg.headers) ``` -## Message Fields access +## Message Fields Access -But in the most cases you don't need all message fields, you need to know just a part of them. +In most cases, you don't need all message fields; you need to know just a part of them. You can use [Context Fields access](../getting-started/context/fields.md) feature for this. -For an example, you can get access to the `headers` like this: +For example, you can get access to the `headers` like this: ```python hl_lines="6" from faststream import Context diff --git a/docs/docs/en/rabbit/ack.md b/docs/docs/en/rabbit/ack.md index b37ded6b7b..7c19d392c7 100644 --- a/docs/docs/en/rabbit/ack.md +++ b/docs/docs/en/rabbit/ack.md @@ -26,7 +26,7 @@ async def base_handler(body: str): ... ``` -If the `retry` flag is set to `int`, the message will be placed back in the queue and the number of retries will be limited to this number: +If the `retry` flag is set to an `int`, the message will be placed back in the queue, and the number of retries will be limited to this number: ```python @broker.subscriber("test", retry=3) # make up to 3 attempts @@ -39,11 +39,11 @@ async def base_handler(body: str): Subsequently, this logic will be reworked. !!! tip - For more complex error handling cases you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"} + For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"} -## Manual Ack +## Manual acknowledgement -If you want to *ack* message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method. +If you want to acknowledge a message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method. ```python from faststream.rabbit import RabbitMessage @@ -57,14 +57,14 @@ async def base_handler(body: str, msg: RabbitMessage): await msg.reject() ``` -**FastStream** will see that the message was already *ack*ed and will do nothing at process end. +**FastStream** will see that the message was already acknowledged and will do nothing at process end. ## Interrupt Process -If you want to interrupt message processing at any callstack, you can raise `faststream.exceptions.AckMessage` +If you want to interrupt message processing at any call stack, you can raise `faststream.exceptions.AckMessage` ``` python linenums="1" hl_lines="16" {!> docs_src/rabbit/ack/errors.py !} ``` -This way **FastStream** interrupts the current message proccessing and *ack* it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. +This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. diff --git a/docs/docs/en/rabbit/declare.md b/docs/docs/en/rabbit/declare.md index 7322a2b787..cb8d558c13 100644 --- a/docs/docs/en/rabbit/declare.md +++ b/docs/docs/en/rabbit/declare.md @@ -8,7 +8,7 @@ {!> docs_src/rabbit/declare.py !} ``` -These methods require just a one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with. +These methods require just one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with. !!! tip - Also, these methods are indempotent, so you can call them with the same arguments multiple times, but the objects will be created once, next time the method will return already stored object. This way you can get access to any queue/exchange created automatically. + Also, these methods are idempotent, so you can call them with the same arguments multiple times, but the objects will be created once; next time the method will return an already stored object. This way you can get access to any queue/exchange created automatically. diff --git a/docs/docs/en/rabbit/examples/direct.md b/docs/docs/en/rabbit/examples/direct.md index 52909f257e..48b644d1c2 100644 --- a/docs/docs/en/rabbit/examples/direct.md +++ b/docs/docs/en/rabbit/examples/direct.md @@ -1,21 +1,20 @@ # Direct Exchange -**Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: `exchange` sends messages to those queues, `routing_key` which matches the `routing_key` of the message being sent. +The **Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: the `exchange` sends messages to those queues whose `routing_key` matches the `routing_key` of the message being sent. !!! note - **Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default + The **Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default. ## Scaling -If several consumers are listening to the same queue, messages will go to the one of them (round-robin). This behavior is common for all types of `exchange`, because it refers to the queue itself. The type of `exchange` affects which queues the message gets into. +If several consumers are listening to the same queue, messages will be distributed to one of them (round-robin). This behavior is common for all types of `exchange` because it refers to the queue itself. The type of `exchange` affects which queues the message gets into. -Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed -of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: *RabbitMQ* will take care of how to distribute messages between your services. +Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: RabbitMQ will take care of how to distribute messages between your services. ## Example !!! tip - **Direct** Exchange is the type used in **FastStream** by default: you can simply declare it as follows + The **Direct** Exchange is the type used in **FastStream** by default. You can simply declare it as follows: ```python @broker.subscriber("test_queue", "test_exchange") @@ -23,7 +22,7 @@ of the message flow from the queue by launching additional instances of a consum ... ``` -The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs +The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs. ```python linenums="1" {!> docs_src/rabbit/subscription/direct.py !} @@ -37,7 +36,7 @@ First, we announce our **Direct** exchange and several queues that will listen t {!> docs_src/rabbit/subscription/direct.py [ln:7-10]!} ``` -Then we sign up several consumers using the advertised queues to the `exchange` we created +Then we sign up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" {!> docs_src/rabbit/subscription/direct.py [ln:13-25]!} @@ -45,18 +44,18 @@ Then we sign up several consumers using the advertised queues to the `exchange` !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution -Now the distribution of messages between these consumers will look like this: +Now, the distribution of messages between these consumers will look like this: ```python linenums="30" {!> docs_src/rabbit/subscription/direct.py [ln:30]!} ``` -Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `test-q-1` +Message `1` will be sent to `handler1` because it listens to the `exchange` using a queue with the routing key `test-q-1`. --- @@ -64,7 +63,7 @@ Message `1` will be sent to `handler1` because it listens to `exchange` using a {!> docs_src/rabbit/subscription/direct.py [ln:31]!} ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to the `exchange` using the same queue, but `handler1` is busy. --- @@ -72,7 +71,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th {!> docs_src/rabbit/subscription/direct.py [ln:32]!} ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -80,4 +79,4 @@ Message `3` will be sent to `handler1` again, because it is currently free {!> docs_src/rabbit/subscription/direct.py [ln:33]!} ``` -Message `4` will be sent to `handler3`, because it is the only one listening to `exchange` using a queue with the routing key `test-q-2` +Message `4` will be sent to `handler3` because it is the only one listening to the `exchange` using a queue with the routing key `test-q-2`. diff --git a/docs/docs/en/rabbit/examples/fanout.md b/docs/docs/en/rabbit/examples/fanout.md index 871616f590..14272d751d 100644 --- a/docs/docs/en/rabbit/examples/fanout.md +++ b/docs/docs/en/rabbit/examples/fanout.md @@ -1,6 +1,6 @@ # Fanout Exchange -**Fanout** Exchange is an even simpler, but slightly less popular way of routing in *RabbitMQ*. This type of `exchange` sends messages to all queues subscribed to it, ignoring any arguments of the message. +The **Fanout** Exchange is an even simpler, but slightly less popular way of routing in *RabbitMQ*. This type of `exchange` sends messages to all queues subscribed to it, ignoring any arguments of the message. At the same time, if the queue listens to several consumers, messages will also be distributed among them. @@ -18,7 +18,7 @@ To begin with, we announced our **Fanout** exchange and several queues that will {!> docs_src/rabbit/subscription/fanout.py [ln:7-10]!} ``` -Then we signed up several consumers using the advertised queues to the `exchange` we created +Then we signed up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" {!> docs_src/rabbit/subscription/fanout.py [ln:13-25]!} @@ -26,10 +26,10 @@ Then we signed up several consumers using the advertised queues to the `exchange !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -37,7 +37,7 @@ Now the distribution of messages between these consumers will look like this: {!> docs_src/rabbit/subscription/fanout.py [ln:30]!} ``` -Message `1` will be sent to `handler1` and `handler3`, because they listen to `exchange` using different queues +Message `1` will be sent to `handler1` and `handler3` because they listen to `exchange` using different queues. --- @@ -45,7 +45,7 @@ Message `1` will be sent to `handler1` and `handler3`, because they listen to `e {!> docs_src/rabbit/subscription/fanout.py [ln:31]!} ``` -Message `2` will be sent to `handler2` and `handler3`, because `handler2` listens to `exchange` using the same queue as `handler1` +Message `2` will be sent to `handler2` and `handler3` because `handler2` listens to `exchange` using the same queue as `handler1`. --- @@ -53,7 +53,7 @@ Message `2` will be sent to `handler2` and `handler3`, because `handler2` listen {!> docs_src/rabbit/subscription/fanout.py [ln:32]!} ``` -Message `3` will be sent to `handler1` and `handler3` +Message `3` will be sent to `handler1` and `handler3`. --- @@ -61,9 +61,9 @@ Message `3` will be sent to `handler1` and `handler3` {!> docs_src/rabbit/subscription/fanout.py [ln:33]!} ``` -Message `4` will be sent to `handler3` and `handler3` +Message `4` will be sent to `handler2` and `handler3`. --- !!! note - When sending messages to **Fanout** exchange, it makes no sense to specify the arguments `queue` or `routing_key`, because they will be ignored + When sending messages to **Fanout** exchange, it makes no sense to specify the arguments `queue` or `routing_key`, because they will be ignored. diff --git a/docs/docs/en/rabbit/examples/headers.md b/docs/docs/en/rabbit/examples/headers.md index d39ca2df01..8289864cc6 100644 --- a/docs/docs/en/rabbit/examples/headers.md +++ b/docs/docs/en/rabbit/examples/headers.md @@ -1,6 +1,6 @@ # Header Exchange -**Header** Exchange is the most complex and flexible way to route messages in *RabbitMQ*. This `exchange` type sends messages to queues according by matching the queue binding arguments with message headers. +The **Header** Exchange is the most complex and flexible way to route messages in *RabbitMQ*. This `exchange` type sends messages to queues according by matching the queue binding arguments with message headers. At the same time, if several consumers are subscribed to the queue, messages will also be distributed among them. @@ -12,7 +12,7 @@ At the same time, if several consumers are subscribed to the queue, messages wil ### Consumer Announcement -First, we announce our **Fanout** exchange and several queues that will listen to it: +First, we announce our **Header** exchange and several queues that will listen to it: ```python linenums="7" hl_lines="1 6 11 16" {!> docs_src/rabbit/subscription/header.py [ln:7-23]!} @@ -20,7 +20,7 @@ First, we announce our **Fanout** exchange and several queues that will listen t The `x-match` argument indicates whether the arguments should match the message headers in whole or in part. -Then we signed up several consumers using the advertised queues to the `exchange` we created +Then we signed up several consumers using the advertised queues to the `exchange` we created: ```python linenums="26" hl_lines="1 6 11 16" {!> docs_src/rabbit/subscription/header.py [ln:26-43]!} @@ -28,10 +28,10 @@ Then we signed up several consumers using the advertised queues to the `exchange !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -39,7 +39,7 @@ Now the distribution of messages between these consumers will look like this: {!> docs_src/rabbit/subscription/header.py [ln:48]!} ``` -Message `1` will be sent to `handler1`, because it listens to a queue whose `key` header matches the `key` header of the message +Message `1` will be sent to `handler1` because it listens to a queue whose `key` header matches the `key` header of the message. --- @@ -47,7 +47,7 @@ Message `1` will be sent to `handler1`, because it listens to a queue whose `key {!> docs_src/rabbit/subscription/header.py [ln:49]!} ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy. --- @@ -55,7 +55,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th {!> docs_src/rabbit/subscription/header.py [ln:50]!} ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -63,7 +63,7 @@ Message `3` will be sent to `handler1` again, because it is currently free {!> docs_src/rabbit/subscription/header.py [ln:51]!} ``` -Message `4` will be sent to `handler3`, because it listens to a queue whose `key` header coincided with the `key` header of the message +Message `4` will be sent to `handler3` because it listens to a queue whose `key` header coincided with the `key` header of the message. --- @@ -71,7 +71,7 @@ Message `4` will be sent to `handler3`, because it listens to a queue whose `key {!> docs_src/rabbit/subscription/header.py [ln:52]!} ``` -Message `5` will be sent to `handler3`, because it listens to a queue whose header `key2` coincided with the header `key2` of the message +Message `5` will be sent to `handler3` because it listens to a queue whose header `key2` coincided with the header `key2` of the message. --- @@ -79,7 +79,7 @@ Message `5` will be sent to `handler3`, because it listens to a queue whose head {!> docs_src/rabbit/subscription/header.py [ln:53-55]!} ``` -Message `6` will be sent to `handler3` and `handler4`, because the message headers completely match the queue keys +Message `6` will be sent to `handler3` and `handler4` because the message headers completely match the queue keys. --- diff --git a/docs/docs/en/rabbit/examples/index.md b/docs/docs/en/rabbit/examples/index.md index 46005b97d7..527f1818d7 100644 --- a/docs/docs/en/rabbit/examples/index.md +++ b/docs/docs/en/rabbit/examples/index.md @@ -1,16 +1,16 @@ # Basic Subscriber -If you know nothing about **RabbitMQ** and how it works you will still able to use **FastStream RabbitBroker**. +If you know nothing about **RabbitMQ** and how it works, you will still able to use **FastStream RabbitBroker**. -Just use `#!python @broker.subscriber(...)` method with a string as a routing key. +Just use the `#!python @broker.subscriber(...)` method with a string as a routing key. ```python linenums="1" {!> docs_src/rabbit/subscription/index.py !} ``` -This is the principle all **FastStream** brokers work by: you don't need to learn them in-depth if you want to *just send a message* +This is the principle all **FastStream** brokers work by: you don't need to learn them in-depth if you want to *just send a message*. -## RabbitMQ details +## RabbitMQ Details If you are already familiar with **RabbitMQ** logic, you should also be acquainted with the inner workings of the example mentioned above. diff --git a/docs/docs/en/rabbit/examples/stream.md b/docs/docs/en/rabbit/examples/stream.md index 0ee993be5c..550aed70c0 100644 --- a/docs/docs/en/rabbit/examples/stream.md +++ b/docs/docs/en/rabbit/examples/stream.md @@ -1,6 +1,6 @@ # RabbitMQ Streams -*RabbitMQ* has a [Streams](https://www.rabbitmq.com/streams.html){.exteranl-link target="_blank"} feature, which logic is closely related to *Kafka* topics. +*RabbitMQ* has a [Streams](https://www.rabbitmq.com/streams.html){.exteranl-link target="_blank"} feature, which is closely related to *Kafka* topics. The main difference from regular *RabbitMQ* queues is that the messages are not deleted after consuming. diff --git a/docs/docs/en/rabbit/examples/topic.md b/docs/docs/en/rabbit/examples/topic.md index 64ec2e0c9d..472675e414 100644 --- a/docs/docs/en/rabbit/examples/topic.md +++ b/docs/docs/en/rabbit/examples/topic.md @@ -1,6 +1,6 @@ # Topic Exchange -**Topic** Exchange is a powerful *RabbitMQ* routing tool. This type of `exchange` sends messages to the queue in accordance with the *pattern* specified when they are connected to `exchange` and the `routing_key` of the message itself. +The **Topic** Exchange is a powerful *RabbitMQ* routing tool. This type of `exchange` sends messages to the queue in accordance with the *pattern* specified when they are connected to `exchange` and the `routing_key` of the message itself. At the same time, if several consumers are subscribed to the queue, messages will be distributed among them. @@ -20,7 +20,7 @@ First, we announce our **Topic** exchange and several queues that will listen to At the same time, in the `routing_key` of our queues, we specify the *pattern* of routing keys that will be processed by this queue. -Then we sign up several consumers using the advertised queues to the `exchange` we created +Then we sign up several consumers using the advertised queues to the `exchange` we created: ```python linenums="13" hl_lines="1 6 11" {!> docs_src/rabbit/subscription/topic.py [ln:13-25]!} @@ -28,10 +28,10 @@ Then we sign up several consumers using the advertised queues to the `exchange` !!! note `handler1` and `handler2` are subscribed to the same `exchange` using the same queue: - within a single service, this does not make a sense, since messages will come to these handlers in turn. + within a single service, this does not make sense, since messages will come to these handlers in turn. Here we emulate the work of several consumers and load balancing between them. -### Message distribution +### Message Distribution Now the distribution of messages between these consumers will look like this: @@ -39,7 +39,7 @@ Now the distribution of messages between these consumers will look like this: {!> docs_src/rabbit/subscription/topic.py [ln:30]!} ``` -Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `*.info` +Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `*.info`. --- @@ -47,7 +47,7 @@ Message `1` will be sent to `handler1` because it listens to `exchange` using a {!> docs_src/rabbit/subscription/topic.py [ln:31]!} ``` -Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy +Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy. --- @@ -55,7 +55,7 @@ Message `2` will be sent to `handler2` because it listens to `exchange` using th {!> docs_src/rabbit/subscription/topic.py [ln:32]!} ``` -Message `3` will be sent to `handler1` again, because it is currently free +Message `3` will be sent to `handler1` again because it is currently free. --- @@ -63,4 +63,4 @@ Message `3` will be sent to `handler1` again, because it is currently free {!> docs_src/rabbit/subscription/topic.py [ln:33]!} ``` -Message `4` will be sent to `handler3`, because it is the only one listening to `exchange` using a queue with the routing key `*.debug` +Message `4` will be sent to `handler3` because it is the only one listening to `exchange` using a queue with the routing key `*.debug`. diff --git a/docs/docs/en/rabbit/index.md b/docs/docs/en/rabbit/index.md index d7c6c5905a..cceec68b4a 100644 --- a/docs/docs/en/rabbit/index.md +++ b/docs/docs/en/rabbit/index.md @@ -13,9 +13,9 @@ It supports the ability to successfully process messages, mark them as processed Having to keep track of the current status of all messages is a cause of the **RabbitMQ** performance issues. With really large message volumes, **RabbitMQ** starts to degrade. However, if this was a "one-time influx", then consumers will free the queue of messages and the "health" of **RabbitMQ** will be stable. -If your scenario is not based on processing millions of messages, and also requires building complex routing logic - **RabbitMQ** you will be right choice. +If your scenario is not based on processing millions of messages and also requires building complex routing logic, **RabbitMQ** will be the right choice. -## Basic concepts +## Basic Concepts If you want to totally understand how *RabbitMQ* works, you should visit their official website. There you will find top-level comments about the basic concepts and usage examples. @@ -27,40 +27,40 @@ If you want to totally understand how *RabbitMQ* works, you should visit their o * `Queue` - the point of pushing messages to *consumer* * `Binding` - the relationship between *queue-exchange* or *exchange-exchange* -### Routing rules +### Routing Rules The rules for delivering messages to consumers depend on the **type of exchange** and **binding** parameters. All the main options will be discussed at [examples](examples/direct.md){.internal-link}. In general, the message path looks so: -1. *Publisher* sends a message to `exchange`, specify its `routing_key` and headers according to which routing will take place -2. `exchange`, depending on the message parameters, determines which of the subscribed `bindings` to send the message to -3. `binding` delivers the message to `queue` or another `exchange` (in this case it will send it further by its own rules) -4. `queue`, after receiving a message, sends it to one of subscribed consumers (**PUSH API**) +1. *Publisher* sends a message to `exchange`, specify its `routing_key` and headers according to which routing will take place. +2. `Exchange`, depending on the message parameters, determines which of the subscribed `bindings` to send the message to. +3. `Binding` delivers the message to `queue` or another `exchange` (in this case it will send it further by its own rules). +4. `Queue`, after receiving a message, sends it to one of subscribed consumers (**PUSH API**). !!! tip - By default, all queues have `binding` to `default exchange` (**Direct** type) with **routing key** corresponding to their name. - In **FastStream**, queues are connected to this `exchange` and messages are sent by default unless another `exchange` is explicitly specified. + By default, all queues have a `binding` to the `default exchange` (**Direct** type) with a **routing key** corresponding to their name. + In **FastStream**, queues are connected to this `exchange`, and messages are sent by default unless another `exchange` is explicitly specified. !!! warning "" Connecting the queue to any other `exchange` will still leave it subscribed to the `default exchange'. Be careful with this. At this stage, the message gets into your application - and you start processing it. -### Message statuses +### Message Statuses -*RabbitMQ* requires confirmation of message processing: only after that it will be removed from the queue. +*RabbitMQ* requires confirmation of message processing: only after that, it will be removed from the queue. -Confirmation can be either positive (`Acknowledgment - ack`) if the message was successfully processed, or negative (`Negative Acknowledgment - nack`) if the message was processed with an error. +Confirmation can be either positive (`Acknowledgment - ack`) if the message was successfully processed or negative (`Negative Acknowledgment - nack`) if the message was processed with an error. -At the same time, in case of an error, the message can also be extracted from the queue (`reject`), otherwise, after a negative confirmation, it will be requeued for processing again. +At the same time, in case of an error, the message can also be extracted from the queue (`reject`); otherwise, after a negative confirmation, it will be requeued for processing again. -In most cases, **FastStream** performs all the necessary actions by itself: however, if you want to manage the message lifecycle directly, you can access the message object itself and call the appropriate methods directly. This can be useful if you want to implement an "at most once" policy and you need to confirm the consuming of the message before it is actually processed. +In most cases, **FastStream** performs all the necessary actions by itself. However, if you want to manage the message lifecycle directly, you can access the message object itself and call the appropriate methods directly. This can be useful if you want to implement an "at most once" policy and you need to confirm the consuming of the message before it is actually processed. -## **FastStream** specific +## **FastStream** Specific -**FastStream** omits the ability to create `bindings` directly, since in most cases you do not need to subscribe one queue to several `exchanges` or subscribe `exchanges` to each other. On the contrary, this practice leads to over-complication of the message routing scheme, which makes it difficult to maintain and further develop the entire infrastructure of services. +**FastStream** omits the ability to create `bindings` directly, since in most cases, you do not need to subscribe one queue to several `exchanges` or subscribe `exchanges` to each other. On the contrary, this practice leads to over-complication of the message routing scheme, which makes it difficult to maintain and further develop the entire infrastructure of services. **FastStream** suggests you adhere to the scheme `exchange:queue` as `1:N`, which will greatly simplify the scheme of interaction between your services. It is better to create an additional queue for a new `exchange` than to subscribe to an existing one. -However, if you want to reduce the number of entities in your RabbitMQ, and thereby optimize its performance (or you know exactly what you are doing), **FastStream** leaves you the option to create `bindings` directly. In other cases, the connection parameters are an integral part of the entities *RabbitQueue* and *RabbitExchange* in **FastStream**. +However, if you want to reduce the number of entities in your **RabbitMQ**, and thereby optimize its performance (or you know exactly what you are doing), **FastStream** leaves you the option to create `bindings` directly. In other cases, the connection parameters are an integral part of the entities **RabbitQueue** and **RabbitExchange** in **FastStream**. diff --git a/docs/docs/en/rabbit/message.md b/docs/docs/en/rabbit/message.md index 55c1f62a4b..a3c1afc9c4 100644 --- a/docs/docs/en/rabbit/message.md +++ b/docs/docs/en/rabbit/message.md @@ -1,8 +1,8 @@ -# Access to Message information +# Access to Message Information -As you know, **FastStream** serializes a message body and provides you access to it by function arguments. But sometimes you want access to a message_id, headers or other meta information. +As you know, **FastStream** serializes a message body and provides you access to it through function arguments. But sometimes you want access to a message_id, headers, or other meta-information. -## Message access +## Message Access You can get it in a simple way: just acces to the message object in the [Context](../getting-started/context/existed.md){.internal-link}! @@ -27,7 +27,7 @@ async def base_handler( print(msg.correlation_id) ``` -Also, if you can't find the information you reqiure, you can get access right to the wrapped `aio_pika.IncomingMessage` which contains complete message information. +Also, if you can't find the information you reqiure, you can get access directly to the wrapped `aio_pika.IncomingMessage`, which contains complete message information. ```python hl_lines="6" from aio_pika import IncomingMessage @@ -39,9 +39,9 @@ async def base_handler(body: str, msg: RabbitMessage): print(raw) ``` -## Message Fields access +## Message Fields Access -But in the most cases you don't need all message fields, you need to access some of them. You can use [Context Fields access](../getting-started/context/fields.md){.internal-link} feature for this reason. +But in the most cases, you don't need all message fields; you need to access some of them. You can use [Context Fields access](../getting-started/context/fields.md){.internal-link} feature for this reason. For example, you can get access to the `correlation_id` like this: @@ -69,7 +69,7 @@ async def base_handler( print(cor_id) ``` -But this code is a too long to be reused everywhere.In this case, you can use a python [`Annotated`](https://docs.python.org/3/library/typing.html#typing.Annotated){.external-link target="_blank"} feature: +But this code is too long to be reused everywhere. In this case, you can use a Python [`Annotated`](https://docs.python.org/3/library/typing.html#typing.Annotated){.external-link target="_blank"} feature: === "python 3.9+" diff --git a/docs/docs/en/rabbit/publishing.md b/docs/docs/en/rabbit/publishing.md index 78471c1b1a..6c56ef8d46 100644 --- a/docs/docs/en/rabbit/publishing.md +++ b/docs/docs/en/rabbit/publishing.md @@ -1,8 +1,8 @@ # Publishing -**FastStream** RabbitBroker supports all regular [publishing usecases](../getting-started/publishing/index.md){.internal-link}, you can use them without any changes. +**FastStream** RabbitBroker supports all regular [publishing usecases](../getting-started/publishing/index.md){.internal-link}. you can use them without any changes. -However, if you whish to further customize the publishing logic further, you should take a more deep-dive look at specific RabbitBroker parameters. +However, if you wish to further customize the publishing logic further, you should take a more deep-dive look at specific RabbitBroker parameters. ## Rabbit Publishing @@ -29,8 +29,7 @@ asyncio.run(pub()) If you don't specify any exchange, the message will be send to the default one. -Also, you able to use special **RabbitQueue** and **RabbitExchange** objects as a `queue` and `exchange` arguments: - +Also, you are able to use special **RabbitQueue** and **RabbitExchange** objects as `queue` and `exchange` arguments: ``` python from faststream.rabbit import RabbitExchange, RabbitQueue @@ -45,11 +44,11 @@ await broker.publish( If you specify exchange that doesn't exist, RabbitBroker will create a required one and then publish a message to it. !!! tip - Be accurate with it: if you have already created an **Exchange** with specific parameters and try to send a message by exchange name to it, the broker will try to create it. So, **Exchange** parameters conflict will occure. + Be accurate with it: if you have already created an **Exchange** with specific parameters and try to send a message by exchange name to it, the broker will try to create it. So, **Exchange** parameters conflict will occur. - If you are trying to send a message to specific **Exchange** - sending it with a defined **RabbitExchange** object is the preffered way. + If you are trying to send a message to a specific **Exchange**, sending it with a defined **RabbitExchange** object is the preffered way. -## Basic arguments +## Basic Arguments The `publish` method takes the following arguments: @@ -58,7 +57,7 @@ The `publish` method takes the following arguments: * `#!python queue: str | RabbitQueue = ""` - the queue where the message will be sent (since most queues use their name as the routing key, this is a human-readable version of `routing_key`) * `#!python routing_key: str = ""` - also a message routing key, if not specified, the `queue` argument will be used -## Message parameters +## Message Parameters You can read more about all the available flags in the [RabbitMQ documentation](https://www.rabbitmq.com/consumers.html){.external-link target="_blank"} @@ -75,7 +74,7 @@ You can read more about all the available flags in the [RabbitMQ documentation]( * `#!python user_id: str | None = None` - ID of the *RabbitMQ* user who sent the message * `#!python app_id: str | None = None` - ID of the application that sent the message (used by consumers) -## Send flags +## Send Flags Arguments for sending a message: diff --git a/docs/docs/en/rabbit/rpc.md b/docs/docs/en/rabbit/rpc.md index b86c9d7ff2..3a71a5ec72 100644 --- a/docs/docs/en/rabbit/rpc.md +++ b/docs/docs/en/rabbit/rpc.md @@ -1,14 +1,14 @@ # RPC over RMQ -## Blocking request +## Blocking Request -**FastStream** provides you with the ability to send blocking RPC request over *RabbitMQ* in a very simple way. +**FastStream** provides you with the ability to send a blocking RPC request over *RabbitMQ* in a very simple way. It uses the [**Direct Reply-To**](https://www.rabbitmq.com/direct-reply-to.html){.external-link target="_blank"} *RabbitMQ* feature, so you don't need to create any queues to consume a response. Just send a message like a regular one and get a response synchronously. -It is a very close to common **requests** syntax: +It is very close to common **requests** syntax: ``` python hl_lines="1 4" msg = await broker.publish( @@ -18,17 +18,16 @@ msg = await broker.publish( ) ``` -Also, you have a two extra options to control this behavior: +Also, you have two extra options to control this behavior: -* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for response -* `#!python raise_timeout: bool = False` - by default timeout request returns `None`, but if you need to raise TimeoutException directly, you can specify this option +* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response +* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a TimeoutException directly, you can specify this option ## Reply-To -Also, if you want to create permanent request-reply data flow, probably, you should create a permanent queue to consume responses. - -So, if you have a such one, you can specify it with `reply_to` argument. This way **FastStream** will send a response in this queue automatically. +Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent queue to consume responses. +So, if you have such one, you can specify it with the `reply_to` argument. This way, **FastStream** will send a response to this queue automatically. ``` python hl_lines="1 8" @broker.subscriber("response-queue")