-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: port core concepts as-is (#17)
- Loading branch information
1 parent
5f42014
commit 33920a1
Showing
12 changed files
with
673 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"label": "Concepts", | ||
"position": 4, | ||
"link": { | ||
"type": "generated-index" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
--- | ||
sidebar_position: 6 | ||
slug: /concepts/batching | ||
title: "Batching" | ||
--- | ||
|
||
Fluvio producers try to send records in batches to reduce the number of messages sent and improve throughput. Each producer has some configurations that can be set to improve performance for a specific use case. For instance, they can be used to reduce disk usage, reduce latency, or improve throughput. | ||
As of today, batching behavior in Fluvio Producers can be modified with the following configurations: | ||
|
||
- `batch_size`: Indicates the maximum amount of bytes that can be accumulated in a batch. | ||
- `linger`: Time to wait before sending messages to the server. Defaults to 100 ms. | ||
- `compression`: Compression algorithm used by the producer to compress each batch before sending it to the SPU. Supported compression algorithms are none, gzip, snappy and lz4. | ||
|
||
In general, each one of these configurations has a benefit and a potential drawback. For instance, with the compression algorithm, it is a trade-off between disk usage in the server and CPU usage in the producer and the consumer for compression and decompression. Typically, the compression ratio is improved when the payload is large, therefore a larger `batch_size` could be used to improve the compression ratio. A `linger` equals `0` means that each record is sent as soon as possible. A `linger` time larger than zero introduces latency but improves throughput. | ||
|
||
The ideal parameters for the `batch_size`, `linger` and `compression` depend on your application needs. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
--- | ||
sidebar_position: 7 | ||
slug: /concepts/data-consistency | ||
title: "Data Consistency" | ||
--- | ||
|
||
Data in this context is a set of records that producers send to the partition leader. The leader is responsible for receiving | ||
messages from the producer, sending messages to the consumer, and replicating them to followers. There can be only **one leader | ||
per partition** at any point in time. Depending on the cluster and topic configurations, there can be many or zero followers. | ||
|
||
Messages get the **order** that the leader observes, and **the reordering is proscribed**. Records order inside the message | ||
is kept and cannot be changed. Each record gets assigned to a **unique monotonically increased** number called **offset**. | ||
|
||
After a record gets accepted by the leader, it can be in one of two states: COMMITTED or UNCOMMITTED. COMMITTED denotes | ||
that all followers have acknowledged the record. | ||
If there are no followers in the partition, the state is always COMMITTED once | ||
a record gets acknowledged. Records are UNCOMMITTED in all other cases. | ||
See more details about [Synchronization Algorithm]. | ||
|
||
Neither the leader nor the follower waits for **data persistence** (fsync) before sending an acknowledgment of the record. It means that | ||
**uncommitted** records may be lost if Leader crashes. | ||
|
||
The leader does not uphold an **atomicity guarantee** for the entire message. Records are processed one by one. If an error occurs, | ||
the operation aborts, response with an error message is returned, but Fluvio does not roll back previous records from the batch. | ||
|
||
What records state to use is a configurable option for both producers and consumers. | ||
|
||
## Producer Isolation | ||
Isolation is a configuration parameter of Producer that has two values: | ||
|
||
1. `ReadCommitted` - Leader waits for records to get committed before sending acknowledgement to Producer. | ||
```bash | ||
$ fluvio produce greetings --isolation read_committed | ||
``` | ||
<!-- TODO recommend change read-commited to ack_committed --> | ||
|
||
2. `ReadUncommitted` - Leader does not wait for records to get committed before sending acknowledgement to Producer. | ||
```bash | ||
$ fluvio produce greetings --isolation read_uncommitted | ||
``` | ||
|
||
`ReadUncommitted` isolation gives **lower latency** but has **weaker guarantees**. | ||
|
||
If not specified, `ReadUncommitted` isolation is used by default. | ||
|
||
-> Producer Isolation determines when a successful delivery has been made for **at-least-once** delivery semantic. [Read details]. | ||
|
||
## Consumer Isolation | ||
Isolation is a configuration parameter of Consumer that has two values: | ||
|
||
1. `ReadCommitted` - Read COMMITTED records only. Leader doesn't send UNCOMMITTED records to Consumer. | ||
```bash | ||
$ fluvio consume greetings --isolation read_committed | ||
``` | ||
|
||
2. `ReadUncommitted` - Read all records regardless of the state. | ||
```bash | ||
$ fluvio consume greetings --isolation read_uncommitted | ||
``` | ||
|
||
If not specified, `ReadUncommitted` isolation is used by default. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
--- | ||
sidebar_position: 8 | ||
slug: /concepts/delivery-semantics | ||
title: "Delivery Semantics" | ||
--- | ||
|
||
The Internet, as well as other networks, is considered an unreliable communication channel. There can be delays or lost messages, and connections can fail unexpectedly. | ||
This affects the reliability of record delivery between producers and the SPU. | ||
|
||
Fluvio producers can be configued with a `delivery_semantic` configuration option, which allows choosing a delivery mechanism. Each mechanism has | ||
a different trade-off between reliability and performance. There are two delivery semantics currently supported by producers: | ||
`at-most-once` and `at-least-once` (default). | ||
|
||
|
||
Regardless of `delivery_semantic`, Fluvio batches outgoing records so it is necessary to flush the producer once all records have been sent to ensure proper delivery. | ||
|
||
%copy% | ||
```rust | ||
let fluvio = Fluvio::connect().await?; | ||
let config = TopicProducerConfigBuilder::default() | ||
.delivery_semantic(DeliverySemantic::AtMostOnce) | ||
.build()?; | ||
let producer = fluvio.topic_producer_with_config("greetings", config).await?; | ||
producer.send("Hello", "Fluvio!").await?; | ||
producer.flush().await?; | ||
``` | ||
|
||
The mechanism for accessing response metadata for produced records is also the same for both `at-most-once` and `at-least-once` delivery semantics. | ||
|
||
|
||
%copy% | ||
```rust | ||
let output = producer.send("Hello", "Fluvio!").await?; | ||
// Provides response metadata for records such as offset. | ||
// This only returns once batch has been sent and ack is received from SPU. | ||
output.wait().await?; | ||
``` | ||
|
||
|
||
|
||
|
||
### At Most Once | ||
`at-most-once` delivery means that for each record generated by a producer, that record is delivered zero or one times. **This means that messages may be lost.** | ||
|
||
The producer sends the message with records to the SPU and **does not | ||
wait** for the response. This delivery method has higher throughput but no | ||
guarantees that the message was delivered. | ||
|
||
|
||
[Producer Isolation] has no effect if this delivery | ||
semantic is used unless the user explicitly waits for the response, as shown in the following snippet: | ||
<!-- TODO the content contained at this link is in contradiction with the content here --> | ||
|
||
%copy% | ||
```rust | ||
let fluvio = Fluvio::connect().await?; | ||
let config = TopicProducerConfigBuilder::default() | ||
.delivery_semantic(DeliverySemantic::AtMostOnce) | ||
.build()?; | ||
let producer = fluvio.topic_producer_with_config("greetings", config).await?; | ||
let output = producer.send("Hello", "Fluvio!").await?; | ||
output.wait().await?; // Producer isolation has no effect unless wait() is called | ||
``` | ||
|
||
|
||
### At Least Once | ||
`at-least-once` delivery means that for each record handed to the producer potentially **multiple attempts** are made | ||
at delivering it, such that at least one succeeds. **This means that messages may be duplicated | ||
but not lost.** | ||
|
||
The producer sends the message with records to the SPU, **waits** for the response and **resends** in case of | ||
transport errors occur. This delivery method has lower throughput comparing to `at-most-once` but better reliability. | ||
|
||
|
||
[Producer Isolation] determines when the SPU will send the response signifying a successful delivery. | ||
|
||
|
||
There are three main parameters that can be configured for `at-least-once` semantic: maximum amount of retries, the retry backoff strategy (fixed, Fibonacci, or exponential), and maximum timeout for all attempts. | ||
|
||
Example: | ||
|
||
%copy% | ||
```rust | ||
let policy = RetryPolicy { | ||
max_retries: 5, | ||
initial_delay: Duration::from_millis(10), | ||
max_delay: Duration::from_sec(2), | ||
timeout: Duration::from_sec(10), | ||
strategy: RetryStrategy::ExponentialBackoff | ||
}; | ||
let config = TopicProducerConfigBuilder::default() | ||
.delivery_semantic(DeliverySemantic::AtLeastOnce(policy)) | ||
.build()?; | ||
let producer = fluvio.topic_producer_with_config("greetings", config).await?; | ||
``` | ||
In the above example, Fluvio Producer retries at most five times; all retries take a maximum of 10 seconds. The delay between retries increases exponentially. | ||
The first delay is 10ms, the second is 100ms, then 1000ms, and all others are 2000ms as it's defined as a maximum allowed delay. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
--- | ||
sidebar_position: 5 | ||
slug: /concepts/offsets | ||
title: "Offsets" | ||
--- | ||
|
||
When a record is assigned an offset, that offset permanently identifies | ||
that record. The offset that identified that record will never be reused for another record. | ||
|
||
In order to begin consuming records, a consumer must specify the topic and | ||
partition to consume from, as well as the offset into the partition where | ||
it should begin reading. | ||
|
||
The offset of a record is its total | ||
position within its parent partition. | ||
|
||
There are multiple ways that | ||
an offset may be derived in more convenient ways: | ||
|
||
- Directly, as an absolute index into the partition, starting from zero | ||
- As a relative distance from the beginning of the partition | ||
- As a relative distance from the end of the partition | ||
|
||
|
||
There is | ||
a difference between an absolute offset and a relative offset | ||
from the beginning of the partition. | ||
|
||
When consumers specify a relative offset, the offset given by the consumer | ||
is used to calculate the actual total offset into the partition. | ||
|
||
When a record is assigned an offset, that offset permanently identifies | ||
that record, but this does not necessarily mean that the record will always be available. | ||
|
||
If a partition has a retention policy | ||
that causes it to begin deleting records from the beginning, then the | ||
relative-from-beginning offset will count forward from the oldest record | ||
that is still available. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
--- | ||
sidebar_position: 3 | ||
slug: /concepts/partitions | ||
title: "Partitions" | ||
--- | ||
|
||
**Partitions** are the unit of parallelism accessed independently by **producers** and **consumers** within a **topic**. | ||
|
||
Each record stored in a partition is | ||
given an offset, starting from zero and monotonically increasing by one for each new | ||
record. | ||
|
||
Once a record is committed to a partition and an offset is assigned to it, the offset in that partition will _always_ refer to that record. Because of this, all records that are sent to a given partition are | ||
guaranteed to remain ordered in the order they were committed. | ||
|
||
|
||
**Partitions** are configuration objects managed by the system. Topics and partitions are linked through a **parent-child** relationship. The partition generation algorithm is described in the [SC Architecture]. | ||
|
||
[SC Architecture]: | ||
|
||
<img src="/img/docs/concepts/partitions/topic-2-partitions.svg" style={{ justify: 'center', maxWidth: '640px' }} /> | ||
|
||
If a topic is deleted, all associated partitions are automatically removed. | ||
|
||
--- | ||
|
||
#### Producing with Multiple Partitions | ||
|
||
When producing records to a Topic that has multiple partitions, there are two cases to | ||
consider when determining the partitioning behavior. These cases are: | ||
|
||
- When producing a record that **has a key**, and | ||
- When producing a record that **has no key** | ||
|
||
##### Key/value records | ||
|
||
When producing records with keys, the producers will use _hash partitioning_, | ||
where the partition number is derived from the hash of the record's key. This | ||
is used to uphold the golden rule of key-based partitioning: | ||
|
||
> **Records with the same key are always sent to the same partition** | ||
The current implementation of key hashing uses the **sip-2-4** hashing algorithm, | ||
but that is subject to change in the future. | ||
|
||
- [Check out the key partitioning example in the CLI reference] | ||
|
||
##### Records with no keys | ||
|
||
When producing records with no keys, producers will simply assign partition numbers | ||
to incoming records in a round-robin fashion, spreading the load evenly across the | ||
available partitions. | ||
|
||
- [Check out the round-robin partitioning example in the CLI reference] | ||
|
||
#### Consuming with Multiple Partitions | ||
|
||
Currently, consumers are limited to reading from one partition at a time. This means | ||
that in order to read all records from a given topic, it may be necessary to instantiate | ||
one consumer per partition in the topic. | ||
|
||
- [Check out the multi-consumer example in the CLI reference] |
Oops, something went wrong.