Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add example for Rust API #330

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/_embeds/apis/rust/_example-cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24"
1 change: 1 addition & 0 deletions docs/_embeds/apis/rust/_fluvio-crate-version.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fluvio = "0.24"
35 changes: 35 additions & 0 deletions docs/_embeds/apis/rust/consumer-auto-offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";

#[async_std::main]
ajhunyady marked this conversation as resolved.
Show resolved Hide resolved
async fn main() {
ajhunyady marked this conversation as resolved.
Show resolved Hide resolved
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
36 changes: 36 additions & 0 deletions docs/_embeds/apis/rust/consumer-manual-offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}
28 changes: 28 additions & 0 deletions docs/_embeds/apis/rust/consumer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use async_std::stream::StreamExt;

use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
69 changes: 69 additions & 0 deletions docs/_embeds/apis/rust/consumer-wasm-file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
ajhunyady marked this conversation as resolved.
Show resolved Hide resolved
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");

// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}
61 changes: 61 additions & 0 deletions docs/_embeds/apis/rust/consumer-wasm-name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::BTreeMap;
use async_std::stream::StreamExt;

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/[email protected]",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}
25 changes: 25 additions & 0 deletions docs/_embeds/apis/rust/create-list-topics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;

const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// List topics
let topics = admin.all::<TopicSpec>().await.expect("Failed to list topics");
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();

println!("Topics:\n - {}", topic_names.join("\n - "));
}
15 changes: 15 additions & 0 deletions docs/_embeds/apis/rust/producer-kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";

// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(key, value).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");

println!("Sent [{}] {}", key, value);
}
30 changes: 30 additions & 0 deletions docs/_embeds/apis/rust/producer-performance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::time::Duration;

use fluvio::{Fluvio, TopicProducerConfigBuilder, Compression, RecordKey};

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Use config builder to create a topic producer config
let producer_config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(Duration::from_millis(500))
ajhunyady marked this conversation as resolved.
Show resolved Hide resolved
.compression(Compression::Gzip)
.build()
.expect("Failed to create topic producer config");

// Connet to fluvio cluster & create a producer
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
let producer = fluvio.topic_producer_with_config(TOPIC_NAME, producer_config)
.await.expect("Failed to create a producer");

// Send 10 records
for i in 1..=10 {
let record = format!("Record-{}", i);
producer.send(RecordKey::NULL, record.as_str()).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");
}

println!("Sent 10 records successfully.");
}
21 changes: 21 additions & 0 deletions docs/_embeds/apis/rust/producer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use chrono::Local;

use fluvio::RecordKey;

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(RecordKey::NULL, record.clone()).await.expect("Failed to send record");

// Fluvio batches outgoing records by default,
// call flush to ensure the record is sent
producer.flush().await.expect("Failed to flush");

println!("Sent record: {}", record);
}
Loading
Loading