Produce
 

fluvio produce

The fluvio produce command is a way to send records to the leader of a partition. Produce records by specifying the destination Topic

Write messages to a topic/partition

Usage: fluvio produce [OPTIONS] <topic>

Arguments:
  <topic>  The name of the Topic to produce to

Options:
  -v, --verbose
          Print progress output when sending records
      --key <KEY>
          Sends key/value records with this value as key
      --key-separator <KEY_SEPARATOR>
          Sends key/value records split on the first instance of the separator
      --raw
          Send all input as one record. Use this when producing binary files
      --compression <COMPRESSION>
          Compression algorithm to use when sending records. Supported values: none, gzip, snappy, zstd and lz4
  -f, --file <FILE>
          Path to a file to produce to the topic. Default: Each line treated as single record unless `--raw` specified. If absent, producer will read stdin
      --linger <LINGER>
          Time to wait before sending Ex: '150ms', '20s'
      --batch-size <BATCH_SIZE>
          Max amount of bytes accumulated before sending
      --isolation <ISOLATION>
          Isolation level that producer must respect. Supported values: read_committed (ReadCommitted) - wait for records to be committed before response, read_uncommitted (ReadUncommitted) - just wait for leader to accept records
      --delivery-semantic <DELIVERY_SEMANTIC>
          Delivery guarantees that producer must respect. Supported values: at_most_once (AtMostOnce) - send records without waiting from response, at_least_once (AtLeastOnce) - send records and retry if error occurred [default: at-least-once]
      --smartmodule <SMARTMODULE>
          Name of the smartmodule
      --smartmodule-path <SMARTMODULE_PATH>
          Path to the smart module
      --aggregate-initial <AGGREGATE_INITIAL>
          (Optional) Value to use as an initial accumulator for aggregate SmartModules
  -e, --params <PARAMS>
          (Optional) Extra input parameters passed to the smartmodule. They should be passed using key=value format Eg. fluvio produce topic-name --smartmodule my_filter -e foo=bar -e key=value -e one=1
      --transforms-file <TRANSFORMS_FILE>
          (Optional) Path to a file with transformation specification
  -t, --transform <TRANSFORM>
          (Optional) Transformation specification as JSON formatted string. E.g. fluvio produce topic-name --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
  -h, --help
          Print help
 

Examples

 

Produce records from stdin

The quickest way to send a record using the producer is to just type your record into standard input:

$ fluvio produce my-topic
> This is my first record ever
Ok!
> This is my second record ever
Ok!
> ^C

As the message says, each line that you type will be sent a new record to the topic.

The Ok! was printed by the producer after each record, to let us know the record was sent successfully.

 

Produce key/value records from stdin

Fluvio supports key/value records out-of-the-box. In a key/value record, the key is used to decide which partition the record is sent to. Let’s try sending some simple key/value records:

$ fluvio produce my-topic --key-separator=":"
> alice:Alice In Wonderland
Ok!
> batman:Bruce Wayne
Ok!
> ^C

So our records are being sent, but how do we know that the producer recognized each key properly? We can use the --verbose (-v) flag to tell the producer to print back the keys and values it recognizes. That way, we can be confident that our records are being sent the way we want them to be.

$ fluvio produce my-topic -v --key-separator=":"
> santa:Santa Claus
[santa] Santa Claus
Ok!
> ^C

The producer splits the key from the value and prints it in a [key] value format.

 

Produce using a SmartModule

Fluvio’s SmartModules can be applied to the producer to edit the contents of a stream after the records are sent but before they are committed. One way to supply a WASM SmartModule to the fluvio produce command is with the --smartmodule-path option.

Below is an example of how to apply a Map type SmartModule to transform each record in a stream. This particular SmartModule can be used to capitalize every letter in a string.

To use the SmartModule, compile it and provide the .wasm file to the producer:

$ fluvio produce my-topic --smartmodule-path="fluvio_smartmodule_map.wasm"

To avoid sending the SmartModule binary to the cluster with every producer session, you can ask the cluster to store it for you:

$ fluvio smartmodule create --wasm-file="fluvio_smartmodule_map.wasm" my_map

Then just use use the name you provided to apply it:

$ fluvio produce my-topic --smartmodule="my_map"
 

Produce key/value records to multiple partitions

When producing to a topic with multiple partitions, the producer will send all records with the same key to the same partition. Let’s test this out by making a multi-partition topic, then sending some key/value records.

First, we’ll use fluvio topic create to create a topic called multi-keys with 5 partitions:

$ fluvio topic create multi-keys -p 5

Next, let’s create a text file with the records we want to send:

# Put the following records into a text file using your favorite editor
$ cat records.txt
rafael:create account
rafael:validate account
samuel:create account
tabitha:create account
rafael:add item 1234 to cart
tabitha:validate account
samuel:validate account
rafael:add item 2345 to cart
tabitha:add item 9876 to cart
rafael:complete purchase

Then, we’ll send the key/value records from the file, using the --key-separator flag to separate our keys from our values. In this example, the keys are unique username.

$ fluvio produce multi-keys --key-separator=":" -f records.txt

Looking at this sample input, we can see that rafael generated 5 events, samuel generated 2 events, and tabitha generated 3 events. When we look at the partitions, we should see the records distributed in groups of 5, 2, and 3. We can use the fluvio partition list command to view the distribution of records in our partitions:

$ fluvio partition list
 TOPIC       PARTITION  LEADER  REPLICAS  RESOLUTION  SIZE   HW  LEO  LRS  FOLLOWER OFFSETS 
 multi-keys  0          0       []        Online      0 B    0   0    0    [] 
 multi-keys  1          0       []        Online      0 B    0   0    0    [] 
 multi-keys  2          0       []        Online      157 B  3   3    0    [] 
 multi-keys  3          0       []        Online      220 B  5   5    0    [] 
 multi-keys  4          0       []        Online      119 B  2   2    0    [] 

By looking at the high watermark (HW) and log-end-offset (LEO) of our partitions, we can see how many records have landed in each partition. As we expected, they were distributed in groups of 5, 3, and 2. Let’s dig a little further, we know that rafael was the key used by the group of 5 records, so we should be able to see those records by using fluvio consume to consume from partition 3.

$ fluvio consume multi-keys -B -p3 --key-value
[rafael] create account
[rafael] validate account
[rafael] add item 1234 to cart
[rafael] add item 2345 to cart
[rafael] complete purchase
 

Producing to multiple partitions using Round-Robin

When we produce to a topic with multiple partitions, records that have no key are assigned to partitions in a round-robin fashion. This ensures an even load distribution among the partitions.

To see this in action, let’s create a topic with multiple partitions using fluvio topic create.

$ fluvio topic create multi-no-keys -p 5

Let’s produce some data to our topic. We’ll use the same data from Example 3, but this time we won’t tell the Producer to interpret our input as key-value records (we’ll do this by omitting the --key-separator flag).

# Put the following records into a text file using your favorite editor
$ cat records.txt
rafael:create account
rafael:validate account
samuel:create account
tabitha:create account
rafael:add item 1234 to cart
tabitha:validate account
samuel:validate account
rafael:add item 2345 to cart
tabitha:add item 9876 to cart
rafael:complete purchase

Next, we’ll produce the records into the multi-no-keys stream.

$ fluvio produce multi-no-keys -f records.txt

Since records with no keys use round-robin partitioning, we should expect to see the records be evenly distributed among the partitions. This differs from Example 3 in that the records are not grouped by any kind of key. Let’s take a look at our partitions using fluvio partition list.

$ fluvio partition list
 TOPIC          PARTITION  LEADER  REPLICAS  RESOLUTION  SIZE   HW  LEO  LRS  FOLLOWER OFFSETS 
 multi-no-keys  0          0       []        Online      120 B  2   2    0    [] 
 multi-no-keys  1          0       []        Online      121 B  2   2    0    [] 
 multi-no-keys  2          0       []        Online      124 B  2   2    0    [] 
 multi-no-keys  3          0       []        Online      126 B  2   2    0    [] 
 multi-no-keys  4          0       []        Online      127 B  2   2    0    [] 

Notice how the high watermark (HW) and log-end-offset (LEO) tell us that there are exactly 2 records in each partition. Our ten records have been evenly distributed!

 

Producing using a compression algorithm (GZIP)

Fluvio support different types of compression algorithms to send records. Compression, in general, improves throughput in exchange of some CPU cost to compress/uncompress the data.

Let’s try to use gzip algorithm in the CLI.

First, we’ll use fluvio topic create to create a topic called compressed and other topic called uncompressed:

$ fluvio topic create compressed
$ fluvio topic create uncompressed

Next, let’s create a text file called records.txt with the following contents:

{"ts":"2020-06-18T10:44:12","started":{"pid":45678}}
{"ts":"2020-06-18T10:44:13","logged_in":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:44:15","registered":{"username":"bar","email":"bar@example.com"},"connection":{"addr":"2.3.4.5","port":6789}}
{"ts":"2020-06-18T10:44:16","logged_out":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:49:29","logged_in":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:50:13","logged_in":{"username":"bar"},"connection":{"addr":"2.3.4.5","port":6789}}
{"ts":"2020-06-18T10:51:13","logged_out":{"username":"bar"},"connection":{"addr":"2.3.4.5","port":6789}}

Next, we’ll produce the records from that file into the compressed stream using the --compression gzip option.

$ fluvio produce compressed -f records.txt --compression gzip

Let’s produce also the same to the uncompressed stream using no compression.

$ fluvio produce uncompressed -f records.txt # when no --compression flag is passed, it used `none` as compression algorithm

Since records are compressed in the producer before are sent to the SPU, their disk usage on the SPU should be lower than without compression. Let’s take a look at the disk usage by the partitions using fluvio partition list.

$ fluvio partition list
 TOPIC          PARTITION  LEADER  REPLICAS  RESOLUTION  SIZE   HW  LEO  LRS  FOLLOWER OFFSETS 
 compressed     0          0       []        Online      328 B  7   7    0    [] 
 uncompressed   0          0       []        Online      821 B  7   7    0    [] 

Notice how the SIZE field tell us that the compressed topic is using less disk space than the uncompressed topic for the same amount of records.

Also note that, consuming from topics is done at the same way for both compressed and uncompressed data.