Rust Examples
  • The Rust client is the core client for all language clients.
    • New features arrive in the Rust client before any of the other clients
  • Full support for the Admin API.
  • This client uses async Rust for all blocking calls.

Refer to the fluvio docs.rs page for full detail.

 

Example Workflow

Follow the installation instructions to run this example.

[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
fluvio = "0.21"
use async_std::stream::StreamExt;
use chrono::Local;
use fluvio::metadata::topic::TopicSpec;
use fluvio::{Fluvio, RecordKey};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

/// This is an example of a basic Fluvio workflow in Rust
///  
/// 1. Establish a connection to the Fluvio cluster
/// 2. Create a topic to store data in
/// 3. Create a producer and send some bytes
/// 4. Create a consumer, and stream the data back
#[async_std::main]
async fn main() {
   // Connect to Fluvio cluster
   let fluvio = Fluvio::connect().await.unwrap();

   // Create a topic
   let admin = fluvio.admin().await;
   let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
   let _topic_create = admin
       .create(TOPIC_NAME.to_string(), false, topic_spec)
       .await;

   // Create a record
   let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

   // Produce to a topic
   let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
   producer.send(RecordKey::NULL, record).await.unwrap();
   // Fluvio batches outgoing records by default, so flush producer to ensure all records are sent
   producer.flush().await.unwrap();

   // Consume last record from topic
   let consumer = fluvio::consumer(TOPIC_NAME, PARTITION_NUM).await.unwrap();
   let mut stream = consumer.stream(fluvio::Offset::from_end(1)).await.unwrap();
   if let Some(Ok(record)) = stream.next().await {
       let string = String::from_utf8_lossy(record.value());
       println!("{}", string);
   }
}
 

Run

$ cargo run
 

Additional Producer options

Alternatively, we can create a producer with custom configuration:

Example:

This is how to configure a Producer with a batch_size of 500 bytes, linger of 500ms , and Gzip type compression.

let config = TopicProducerConfigBuilder::default()
    		.batch_size(500)
    		.linger(std::time::Duration::from_millis(500))
    		.compression(Compression::Gzip)
    		.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");
 

Using a SmartModule with the Rust Consumer

Below is an example of how to use a SmartModule filter with the Rust consumer.

use std::io::Read;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio::{Fluvio, Offset, PartitionConsumer};
use fluvio::consumer::{
    SmartModuleInvocation,
    SmartModuleInvocationWasm,
    SmartModuleKind,
    ConsumerConfig
};
use async_std::stream::StreamExt;

let raw_buffer = std::fs::read("/my_projects/example_filter/target/wasm32-unknown-unknown/release/example_filter.wasm").expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer);

let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
    wasm: SmartModuleInvocationWasm::AdHoc(buffer),
    kind: SmartModuleKind::Filter,
    params: Default::default()
}));
let filter_config = builder.build().expect("Failed to create config");

// create partition consumer
let consumer = fluvio.partition_consumer("my-topic", 0).await.expect("failed to create consumer");
// stream from beginning
let mut stream = consumer.stream_with_config(Offset::beginning(),filter_config).await.expect("Failed to create stream");

while let Some(Ok(record)) = stream.next().await {
    let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    let value = String::from_utf8_lossy(record.value()).to_string();
    println!("Got filter event: key={:?}, value={}", key, value);
}

Refer to the fluvio docs.rs page for full detail.

 

Links to Docs: