We are hiring! Apply here
Rust Examples
  • The Rust client is the core client for all language clients.
    • New features arrive in the Rust client before any of the other clients
  • Full support for the Admin API.
  • This client uses async Rust for all blocking calls.

Refer to the fluvio docs.rs page for full detail.


Example Workflow

Follow the installation instructions to run this example.

Download Cargo.toml

name = "fluvio-rust-example"
version = "0.0.0"
edition = "2021"
publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

fluvio = "0.12"
async-std = { version = "1", features = ["attributes"] }
chrono = "0.4"

Download main.rs

use async_std::stream::StreamExt;
use chrono::Local;
use fluvio::metadata::topic::TopicSpec;
use fluvio::{Fluvio, RecordKey};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: i32 = 0;
const PARTITIONS: i32 = 1;
const REPLICAS: i32 = 1;

/// This is an example of a basic Fluvio workflow in Rust
/// 1. Establish a connection to the Fluvio cluster
/// 2. Create a topic to store data in
/// 3. Create a producer and send some bytes
/// 4. Create a consumer, and stream the data back
async fn main() {
   // Connect to Fluvio cluster
   let fluvio = Fluvio::connect().await.unwrap();

   // Create a topic
   let admin = fluvio.admin().await;
   let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
   let _topic_create = admin
       .create(TOPIC_NAME.to_string(), false, topic_spec)

   // Create a record
   let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

   // Produce to a topic
   let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
   producer.send(RecordKey::NULL, record).await.unwrap();

   // Consume last record from topic
   let consumer = fluvio::consumer(TOPIC_NAME, PARTITION_NUM).await.unwrap();
   let mut stream = consumer.stream(fluvio::Offset::from_end(0)).await.unwrap();
   if let Some(Ok(record)) = stream.next().await {
       let string = String::from_utf8_lossy(record.value());
       println!("{}", string);



$ cargo run

Additional Producer options

Alternatively, we can create a producer with custom configuration:


This is how to configure a Producer with a batch_size of 500 bytes, linger of 500ms , and Gzip type compression.

let config = TopicProducerConfigBuilder::default()
    		.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");

Using a SmartModule with the Rust Consumer

Below is an example of how to use a SmartModule filter with the Rust consumer.

use std::io::Read;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio::{Fluvio, Offset, PartitionConsumer};
use fluvio::consumer::{
use async_std::stream::StreamExt;

let raw_buffer = std::fs::read("/my_projects/example_filter/target/wasm32-unknown-unknown/release/example_filter.wasm").expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer);

let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
    wasm: SmartModuleInvocationWasm::AdHoc(buffer),
    kind: SmartModuleKind::Filter,
    params: Default::default()
let filter_config = builder.build().expect("Failed to create config");

// create partition consumer
let consumer = fluvio.partition_consumer("my-topic", 0).await.expect("failed to create consumer");
// stream from beginning
let mut stream = consumer.stream_with_config(Offset::beginning(),filter_config).await.expect("Failed to create stream");

while let Some(Ok(record)) = stream.next().await {
    let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    let value = String::from_utf8_lossy(record.value()).to_string();
    println!("Got filter event: key={:?}, value={}", key, value);

Refer to the fluvio docs.rs page for full detail.


Links to Docs: