Skip to content

Commit

Permalink
Fix typos and grammar in Kafka and RabbitMQ articles in the docs (#736)
Browse files Browse the repository at this point in the history
* Fix typos and grammar in Kafka section articles

* Fix typos and grammar in RabbitMQ section articles

---------

Co-authored-by: Davor Runje <[email protected]>
  • Loading branch information
kumaranvpl and davorrunje authored Sep 25, 2023
1 parent 91c7787 commit 97e37a4
Show file tree
Hide file tree
Showing 40 changed files with 252 additions and 258 deletions.
2 changes: 1 addition & 1 deletion .faststream_gen/getting-started/config/index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Settings and Environment Variables

n many cases, your application may require external settings or configurations, such as a broker connection or database credentials.
In many cases, your application may require external settings or configurations, such as a broker connection or database credentials.

To manage these settings effectively, it's common to provide them through environment variables that can be read by the application.

Expand Down
12 changes: 6 additions & 6 deletions .faststream_gen/kafka/Publisher/batch_publisher.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Publishing in batches
# Publishing in Batches

If you want to send your data in batches `#!python @broker.publisher(...)` decorator makes that possible for you.
To produce in batches you need to do two things:
If you want to send your data in batches, `#!python @broker.publisher(...)` decorator makes that possible for you.
To produce in batches, you need to do two things:

1. When creating your publisher, set the `batch` argument to `True`
1. When creating your publisher, set the `batch` argument to `True`.
2. Return a tuple of the messages you wish to send in a batch. This action will prompt the producer to collect the messages and send them in a batch to a Kafka broker.

Here is an example of an app producing in batches to **output_data** topic when consuming from **input_data_1**.

In the highligted lines, we can see the steps of creating and using a batch publisher:

1. Creation of publisher
2. Publishing an actual batch of messages
1. Creation of the publisher.
2. Publishing an actual batch of messages.

```python linenums="1" hl_lines="19 26"
from typing import Tuple
Expand Down
2 changes: 1 addition & 1 deletion .faststream_gen/kafka/Publisher/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Publishing

**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, you can use them without any changes.
**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes.

In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key.
12 changes: 6 additions & 6 deletions .faststream_gen/kafka/Publisher/using_a_key.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# Defining a partition key
# Defining a Partition Key

Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance.

You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)`, this guide will demonstrate to you this feature.
You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)` decorator. This guide will demonstrate this feature to you.

## Calling `publish` with a key
## Calling `publish` with a Key

To publish a message to a Kafka topic using a key, simpliy pass the `key` parameter to the `publish` function call, like this:
To publish a message to a Kafka topic using a key, simply pass the `key` parameter to the `publish` function call, like this:

```python
await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key")
```

## App example
## App Example

Lest take a look at the whole app example that will consume from the **input_data** topic and publish with key to the **output_data** topic.
Let's take a look at the whole app example that will consume from the **input_data** topic and publish with a key to the **output_data** topic.

You can see that the only difference from normal publishing is that now we pass the key to the publisher call.

Expand Down
8 changes: 4 additions & 4 deletions .faststream_gen/kafka/Subscriber/batch_subscriber.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Batch Subscriber

If you want to consume data in batches `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed msg object as a list of messages and setting the `batch` parameter to `True` the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now.
If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now.

## Subscriber function with batching
## Subscriber Function with Batching

To consume messages in batches, you need to wrap you message type into a list and and set the `batch` parameter to `True`, the `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now.
To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now.

Here is an example of consuming in batches from **test_batch** topic:
Here is an example of consuming in batches from the **test_batch** topic:

```python linenums="1"
@broker.subscriber("test_batch", batch=True)
Expand Down
14 changes: 7 additions & 7 deletions .faststream_gen/kafka/Subscriber/index.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Basic Subscriber

To start consuming from a Kafka topic just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator passing a string as a topic key.
To start consuming from a Kafka topic, just decorate your consuming function with a `#!python @broker.subscriber(...)` decorator, passing a string as a topic key.

In the folowing example we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic.
In the folowing example, we will create a simple FastStream app that will consume `HelloWorld` messages from a **hello_world** topic.

The full app code looks like this:

Expand Down Expand Up @@ -39,9 +39,9 @@ from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
```

## Define the HelloWorld message structure
## Define the HelloWorld Message Structure

Next, you need to define the structure of the messages you want to consume from the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project.
Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project.

```python linenums="1"
class HelloWorld(BaseModel):
Expand All @@ -61,7 +61,7 @@ broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
```

## Create a function that will consume messages from a Kafka hello-world topic
## Create a Function that will Consume Messages from a Kafka hello-world Topic

Let’s create a consumer function that will consume `HelloWorld` messages from **hello_world** topic and log them.

Expand All @@ -73,6 +73,6 @@ async def on_hello_world(msg: HelloWorld, logger: Logger):

The function decorated with the `#!python @broker.subscriber(...)` decorator will be called when a message is produced to Kafka.

The message will then be injected into the typed msg argument of the function and its type will be used to parse the message.
The message will then be injected into the typed `msg` argument of the function, and its type will be used to parse the message.

In this example case, when the message is sent into a **hello_world** topic, it will be parsed into a `HelloWorld` class and `on_hello_world` function will be called with the parsed class as msg argument value.
In this example case, when the message is sent to a **hello_world** topic, it will be parsed into a `HelloWorld` class, and the `on_hello_world` function will be called with the parsed class as the `msg` argument value.
4 changes: 2 additions & 2 deletions .faststream_gen/kafka/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Kafka routing
# Kafka Routing

## Kafka Overview

Expand Down Expand Up @@ -39,7 +39,7 @@ To connect to Kafka using the FastStream KafkaBroker module, follow these steps:

2. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic

3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.
3. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

Expand Down
6 changes: 3 additions & 3 deletions .faststream_gen/kafka/message.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ async def base_handler(
print(msg.headers)
```

## Message Fields access
## Message Fields Access

But in the most cases you don't need all message fields, you need to know just a part of them.
In most cases, you don't need all message fields; you need to know just a part of them.
You can use [Context Fields access](../getting-started/context/fields.md) feature for this.

For an example, you can get access to the `headers` like this:
For example, you can get access to the `headers` like this:

```python hl_lines="6"
from faststream import Context
Expand Down
14 changes: 7 additions & 7 deletions .faststream_gen/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def base_handler(body: str):
...
```

If the `retry` flag is set to `int`, the message will be placed back in the queue and the number of retries will be limited to this number:
If the `retry` flag is set to an `int`, the message will be placed back in the queue, and the number of retries will be limited to this number:

```python
@broker.subscriber("test", retry=3) # make up to 3 attempts
Expand All @@ -39,11 +39,11 @@ async def base_handler(body: str):
Subsequently, this logic will be reworked.

!!! tip
For more complex error handling cases you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}
For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}

## Manual Ack
## Manual acknowledgement

If you want to *ack* message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method.
If you want to acknowledge a message manually, you can get access directy to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method.

```python
from faststream.rabbit import RabbitMessage
Expand All @@ -57,11 +57,11 @@ async def base_handler(body: str, msg: RabbitMessage):
await msg.reject()
```

**FastStream** will see that the message was already *ack*ed and will do nothing at process end.
**FastStream** will see that the message was already acknowledged and will do nothing at process end.

## Interrupt Process

If you want to interrupt message processing at any callstack, you can raise `faststream.exceptions.AckMessage`
If you want to interrupt message processing at any call stack, you can raise `faststream.exceptions.AckMessage`

``` python linenums="1" hl_lines="16"
from faststream import FastStream
Expand All @@ -87,4 +87,4 @@ async def test_publishing():
await broker.publish("Hello!", "test-queue")
```

This way **FastStream** interrupts the current message proccessing and *ack* it immediately. Also, you can raise `NackMessage` and `RejectMessage` too.
This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too.
4 changes: 2 additions & 2 deletions .faststream_gen/rabbit/declare.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def declare_smth():
)
```

These methods require just a one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with.
These methods require just one argument (`RabbitQueue`/`RabbitExchange`) containing information about your *RabbitMQ* required objects. They declare/validate *RabbitMQ* objects and return low-level **aio-pika** robust objects to interact with.

!!! tip
Also, these methods are indempotent, so you can call them with the same arguments multiple times, but the objects will be created once, next time the method will return already stored object. This way you can get access to any queue/exchange created automatically.
Also, these methods are idempotent, so you can call them with the same arguments multiple times, but the objects will be created once; next time the method will return an already stored object. This way you can get access to any queue/exchange created automatically.
29 changes: 14 additions & 15 deletions .faststream_gen/rabbit/examples/direct.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
# Direct Exchange

**Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: `exchange` sends messages to those queues, `routing_key` which matches the `routing_key` of the message being sent.
The **Direct** Exchange is the basic way to route messages in *RabbitMQ*. Its core is very simple: the `exchange` sends messages to those queues whose `routing_key` matches the `routing_key` of the message being sent.

!!! note
**Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default
The **Default** Exchange, to which all queues in *RabbitMQ* are subscribed, has the **Direct** type by default.

## Scaling

If several consumers are listening to the same queue, messages will go to the one of them (round-robin). This behavior is common for all types of `exchange`, because it refers to the queue itself. The type of `exchange` affects which queues the message gets into.
If several consumers are listening to the same queue, messages will be distributed to one of them (round-robin). This behavior is common for all types of `exchange` because it refers to the queue itself. The type of `exchange` affects which queues the message gets into.

Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed
of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: *RabbitMQ* will take care of how to distribute messages between your services.
Thus, *RabbitMQ* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by launching additional instances of a consumer service. You don't need to make changes to the current infrastructure configuration: RabbitMQ will take care of how to distribute messages between your services.

## Example

!!! tip
**Direct** Exchange is the type used in **FastStream** by default: you can simply declare it as follows
The **Direct** Exchange is the type used in **FastStream** by default. You can simply declare it as follows:

```python
@broker.subscriber("test_queue", "test_exchange")
async def handler():
...
```

The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs
The argument `auto_delete=True` in this and subsequent examples is used only to clear the state of *RabbitMQ* after example runs.

```python linenums="1"
from faststream import FastStream, Logger
Expand Down Expand Up @@ -72,7 +71,7 @@ queue_1 = RabbitQueue("test-q-1", auto_delete=True)
queue_2 = RabbitQueue("test-q-2", auto_delete=True)
```

Then we sign up several consumers using the advertised queues to the `exchange` we created
Then we sign up several consumers using the advertised queues to the `exchange` we created:

```python linenums="13" hl_lines="1 6 11"
@broker.subscriber(queue_1, exch)
Expand All @@ -92,39 +91,39 @@ async def base_handler3(logger: Logger):

!!! note
`handler1` and `handler2` are subscribed to the same `exchange` using the same queue:
within a single service, this does not make a sense, since messages will come to these handlers in turn.
within a single service, this does not make sense, since messages will come to these handlers in turn.
Here we emulate the work of several consumers and load balancing between them.

### Message distribution
### Message Distribution

Now the distribution of messages between these consumers will look like this:
Now, the distribution of messages between these consumers will look like this:

```python linenums="30"
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
```

Message `1` will be sent to `handler1` because it listens to `exchange` using a queue with the routing key `test-q-1`
Message `1` will be sent to `handler1` because it listens to the `exchange` using a queue with the routing key `test-q-1`.

---

```python linenums="31"
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 2
```

Message `2` will be sent to `handler2` because it listens to `exchange` using the same queue, but `handler1` is busy
Message `2` will be sent to `handler2` because it listens to the `exchange` using the same queue, but `handler1` is busy.

---

```python linenums="32"
await broker.publish(queue="test-q-1", exchange=exch) # handlers: 1
```

Message `3` will be sent to `handler1` again, because it is currently free
Message `3` will be sent to `handler1` again because it is currently free.

---

```python linenums="33"
await broker.publish(queue="test-q-2", exchange=exch) # handlers: 3
```

Message `4` will be sent to `handler3`, because it is the only one listening to `exchange` using a queue with the routing key `test-q-2`
Message `4` will be sent to `handler3` because it is the only one listening to the `exchange` using a queue with the routing key `test-q-2`.
Loading

0 comments on commit 97e37a4

Please sign in to comment.