We are hiring! Apply here

This Week in Fluvio #22

Feb 9, 2022

Welcome to This Week in Fluvio, our weekly newsletter for development updates to Fluvio open source. Fluvio is a distributed, programmable streaming platform written in Rust.


New Release - Fluvio v0.9.19


Connector versions

Now when you run fluvio connector list, the version of the connector is returned

Example connector config:

# test-connector-config.yaml 
version: 0.1.1
name: my-test-connector
type: test-connector
topic: my-test-connector-topic
direction: source
$ fluvio connector create --config test-connector-config.yaml
$ fluvio connector list
 NAME               VERSION  STATUS 
 my-test-connector  0.1.1    Running 

SmartModule debugging support using WASI

This is for advanced users who are willing to compile Fluvio locally. Please follow the Fluvio Developer guide to get set up for local development.

SmartModule devs can now compile Fluvio with WASI support. This provides SmartModules access to stdout and stderr for debugging purposes.

$ git clone https://github.com/infinyon/fluvio.git

Build the development Fluvio cluster image with WASI support enabled

$ DEBUG_SMARTMODULE=true make build_k8_image

Build the development Fluvio CLI.

$ make build-cli

Start our development Fluvio cluster with WASI support

$ ./target/debug/fluvio cluster start --develop

Here’s our example SmartModule. It is a slight modification of our filter example. For debugging purposes, we print the record to stdout before checking the contents of the record and applying filtering.

use fluvio_smartmodule::{smartmodule, Record, Result};

pub fn filter(record: &Record) -> Result<bool> {
    // Print every record to SPU logs
    println!("DEBUG: {record:#?}");
    let string = std::str::from_utf8(record.value.as_ref())?;

Before you build the SmartModule, you need to add the wasm32-wasi target with rustup.

$ rustup target add wasm32-wasi

Build SmartModule using the wasm32-wasi target to use it against our WASI-enabled cluster.

$ cargo build --release --target wasm32-wasi

Load the WASI SmartModule into the cluster as wasi-sm

$ ./target/debug/fluvio smart-module create wasi-sm --wasm-file ./target/wasm32-wasi/release/fluvio_wasm_filter.wasm

Create the testing topic twif-22

$ ./target/debug/fluvio topic create twif-22
topic "twif-22" created

For our example producer input, we’ll send 2 records to demonstrate the SmartModule output.

$ ./target/debug/fluvio produce twif-22
> a
> b

In the consumer output using our WASI SmartModule, only the first record prints, which is the correct behavior.

$ ./target/debug/fluvio consume twif-22 --filter wasi-sm
Consuming records from the end of topic 'twif-22'. This will wait for new records

To view our SmartModule debug output, we look at the SPU pod logs in Kubernetes. At the bottom of the log we can verify that the contents of each record was printed.

$ kubectl logs -f fluvio-spg-main-0
2022-02-15T00:45:25.502747Z  INFO accept_incoming{self=FluvioApiServer("")}: fluvio_service::server: Received connection, spawning request handler
DEBUG: Record {
    preamble: RecordHeader {
        attributes: 0,
        timestamp_delta: 0,
        offset_delta: 0,
    key: None,
    value: a,
    headers: 0,
DEBUG: Record {
    preamble: RecordHeader {
        attributes: 0,
        timestamp_delta: 0,
        offset_delta: 0,
    key: None,
    value: b,
    headers: 0,




We will provide a more hands-on blog post in the future, but for now we’ll summarize the release.


Postgres Source connector

The Fluvio source connector allows you to connect to an external Postgres database and implement Change Data Capture (CDC) patterns by recording all database updates into a Fluvio topic.

There is a little bit of required configuration on the Postgres database side, but the Postgres source connector config looks like this:

# example-pg-source-connect.yml
version: 0.1.0
name: my-postgres-source
type: postgres-source
topic: postgres-topic
  publication: fluvio
  slot: fluvio
  FLUVIO_PG_DATABASE_URL: postgres://postgres:mysecretpassword@localhost:5432/postgres

Postgres Sink connector

The Postgres sink connector consumes the CDC event data from the Postgres source connector and runs the corresponding SQL against the sink connector’s Postgres database.

The Postgres sink connector looks like this:

# connect.yml
version: 0.1.0
name: my-postgres-sink
type: postgres-sink
topic: postgres-topic
  url: postgres://postgres:mysecretpassword@localhost:5432/postgres
  FLUVIO_PG_DATABASE_URL: postgres://postgres:mysecretpassword@localhost:5432/postgres

Postgres Connector Docs available now

A lot of work went into the release of our new Postgres connectors that we couldn’t cover in depth here.

We encourage you to visit the docs, and expect a walkthrough using the Source and Sink connectors together in the future.



Our HTTP source connector has new options available output_type and output_parts to format its output.

Example HTTP connector config

# connect.yml
version: 0.2.0
name: cat-facts
type: http
topic: cat-facts
direction: source
  endpoint: https://catfact.ninja/fact
  interval: 10
  output_parts: body # default
  output_type: text  # default

For example, our endpoint returns a JSON object in the body of the HTTP response.

$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
{"fact":"In 1987 cats overtook dogs as the number one pet in America.","length":60}

If you want the full HTTP response, you can use output_parts: full

# connect.yml
- output_parts: body # default
+ output_parts: full
  output_type: text  # default
$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
HTTP/1.1 200 OK
server: nginx
date: Wed, 16 Feb 2022 00:53:04 GMT
content-type: application/json
transfer-encoding: chunked
connection: keep-alive
vary: Accept-Encoding
cache-control: no-cache, private
x-ratelimit-limit: 100
x-ratelimit-remaining: 96
access-control-allow-origin: *
set-cookie: XSRF-TOKEN=REDACTED expires=Wed, 16-Feb-2022 02:53:04 GMT; path=/; samesite=lax
set-cookie: cat_facts_session=REDACTED expires=Wed, 16-Feb-2022 02:53:04 GMT; path=/; httponly; samesite=lax
x-frame-options: SAMEORIGIN
x-xss-protection: 1; mode=block
x-content-type-options: nosniff

{"fact":"Cats only use their meows to talk to humans, not each other. The only time they meow to communicate with other felines is when they are kittens to signal to their mother.","length":170}

If you plan to process the HTTP response details, it might be more useful to use output_type: json.

# connect.yml
  output_parts: full
- output_type: text  # default
+ output_type: json
$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
{"status":{"version":"HTTP/1.1","code":200,"string":"OK"},"header":{"set-cookie":["XSRF-TOKEN=REDACTED expires=Wed, 16-Feb-2022 02:56:22 GMT; path=/; samesite=lax","cat_facts_session=REDACTED expires=Wed, 16-Feb-2022 02:56:22 GMT; path=/; httponly; samesite=lax"],"content-type":"application/json","x-frame-options":"SAMEORIGIN","x-content-type-options":"nosniff","x-xss-protection":"1; mode=block","vary":"Accept-Encoding","server":"nginx","x-ratelimit-remaining":"94","date":"Wed, 16 Feb 2022 00:56:22 GMT","transfer-encoding":"chunked","cache-control":"no-cache, private","x-ratelimit-limit":"100","access-control-allow-origin":"*","connection":"keep-alive"},"body":"{\"fact\":\"There are more than 500 million domestic cats in the world, with approximately 40 recognized breeds.\",\"length\":100}"}

Updated docs for the HTTP Connector are available

Get in touch with us on Github Discussions or join our Discord channel and come say hello!

For the full list of changes this week, be sure to check out our CHANGELOG.

Until next week!