The fluvio consume
command is a way to read the contents of records in a Fluvio topic
from a command-line environment.
The consume
command will only read from one of those partitions, defaulting to partition 0
.
Read messages from a topic/partition
Usage: fluvio consume [OPTIONS] <topic>
Arguments:
<topic> Topic name
Options:
-p, --partition <integer>
Partition id [default: 0]
-A, --all-partitions
Consume records from all partitions
-d, --disable-continuous
Disable continuous processing of messages
--disable-progressbar
Disable the progress bar and wait spinner
-k, --key-value
Print records in "[key] value" format, with "[null]" for no key
-F, --format <FORMAT>
Provide a template string to print records with a custom format. See --help for details
--table-format <TABLE_FORMAT>
Consume records using the formatting rules defined by TableFormat name
-B, --beginning
Consume records from the beginning of the log
-H, --head <integer>
Consume records starting <integer> from the beginning of the log
-T, --tail <integer>
Consume records starting <integer> from the end of the log
--start <integer>
The absolute offset of the first record to begin consuming from
--end <integer>
Consume records until end offset (inclusive)
-b, --maxbytes <integer>
Maximum number of bytes to be retrieved
--suppress-unknown
Suppress items items that have an unknown output type
-O, --output <type>
Output [possible values: dynamic, text, binary, json, raw, table, full-table]
--smartmodule <SMARTMODULE>
Name of the smart module
--smartmodule-path <SMARTMODULE_PATH>
Path to the stmart module
--aggregate-initial <AGGREGATE_INITIAL>
(Optional) Path to a file to use as an initial accumulator value with --aggregate
-e, --params <PARAMS>
(Optional) Extra input parameters passed to the smartmodule module. They should be passed using key=value format Eg. fluvio consume topic-name --filter filter.wasm -e foo=bar -e key=value -e one=1
--isolation <ISOLATION>
Isolation level that consumer must respect. Supported values: read_committed (ReadCommitted) - consume only committed records, read_uncommitted (ReadUncommitted) - consume all records accepted by leader
--transforms-file <TRANSFORMS_FILE>
(Optional) Path to a file with transformation specification
-t, --transform <TRANSFORM>
(Optional) Transformation specification as JSON formatted string. E.g. fluvio consume topic-name --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
-h, --help
Print help information (use `--help` for more detail)
The following fluvio consume
examples come after the fluvio produce
examples.
When consuming, we need to specify a starting offset from which to begin reading.
We can use the --from-beginning
(-B
) flag in order to read everything from the very
beginning. Here we’ll also use the --disable-continuous
(-d
) flag in order to exit
after all the records have been read:
$ fluvio consume my-topic -B -d
This is my first record ever
This is my second record ever
Alice In Wonderland
Bruce Wayne
Santa Claus
Notice that all the records are printed by value only: the records with keys have not had their keys printed! This is the default behavior of the consumer. To see how to print the keys of key/value records, see the next example!
If we want to see both the keys and values of the records in the topic, you can use
the --key-value
flag:
$ fluvio consume my-topic -dB --key-value
[null] This is my first record ever
[null] This is my second record ever
[alice] Alice In Wonderland
[batman] Bruce Wayne
[santa] Santa Claus
Records that were not given a key are printed with [null]
.
Fluvio SmartModules are WASM modules that can edit the contents of a stream
inline, before the records of that stream are delivered to a consumer. In order
to use SmartModules, you must supply the WASM module to the fluvio consume
command using the SmartModule options: --filter
, --map
, --aggregate
, --filter-map
, --array-map
.
For Fluvio CLI >= 0.9.32 you can use the --smartmodule
flag to specify the smartmodule name.
This will defer the resolution of the SmartModule type to the SPU.
The simplest SmartModule is the filter example, which
filters records from the stream based on whether they contain the letter a
or not. You can find the full example code in our GitHub repo and compile
it to test out yourself.
Once you have compiled your SmartModule Filter and have a .wasm
file for it, you
can apply it to the consumer as follows:
$ fluvio consume my-topic -B --filter="fluvio_wasm_filter.wasm"
# or with the --smartmodule flag for fluvio 0.9.32 or later
$ fluvio consume my-topic -B --smartmodule="fluvio_wasm_filter.wasm"
As of today, the Fluvio CLI Consumer can only consume records from a single
partition at a time. When running fluvio consume topic-name
, the CLI will
read records from partition 0
by default. Let’s look at how we can read
records from the different partitions in a topic by using the --partition (-p)
flag.
Start out by creating a new topic with multiple partitions using fluvio topic create
.
$ fluvio topic create consume-multi -p 3
Let’s create a text file with some records we would like to send. Each line of the text file will be treated as one record.
# Put the following records into a text file using your favorite editor
$ cat records.txt
one
two
three
four
five
six
seven
eight
nine
Then, produce the test data to the topic.
$ fluvio produce "consume-multi" -f records.txt
After producing some data, let’s take a look at how the records got distributed
among our partitions using fluvio partition list
.
$ fluvio partition list
TOPIC PARTITION LEADER REPLICAS RESOLUTION HW LEO LSR FOLLOWER OFFSETS
consume-multi 0 5001 [] Online 3 3 0 []
consume-multi 1 5001 [] Online 3 3 0 []
consume-multi 2 5001 [] Online 3 3 0 []
We can see by the high watermark (HW) and log-end-offset (LEO) that 3 records were sent to each partition. Let’s look at how to consume from each partition.
To consume from a specific partition, use the --partition (-p)
flag on fluvio consume
.
$ fluvio consume "consume-multi" -B --partition 0
one
four
seven
To consume from partition 1:
$ fluvio consume "consume-multi" -B --partition 1
two
five
eight
And from partition 2:
$ fluvio consume "consume-multi" -B --partition 2
three
six
nine
At times, it is useful to see all records from all partitions from a single consumer. Using the example above:
$ fluvio partition list
TOPIC PARTITION LEADER REPLICAS RESOLUTION HW LEO LSR FOLLOWER OFFSETS
consume-multi 0 5001 [] Online 3 3 0 []
consume-multi 1 5001 [] Online 3 3 0 []
consume-multi 2 5001 [] Online 3 3 0 []
Each partition has 3 records. Now let’s consume from all partitions:
$ fluvio consume "consume-multi" -B -A
one
four
seven
two
three
five
six
eight
nine
Sometimes, the default Consumer printout might not work for your needs. As of Fluvio 0.9.6
you can now use the --format
string to describe how the Consumer should print your records!
The format string will replace placeholders such as {{key}}
, {{value}}
, {{partition}}
(added in Fluvio 0.9.9
), {{offset}}
and {{time}}
(added in Fluvio 0.9.25
)
with the actual contents for each record. One possible use for this is formatting each record
as a CSV row:
$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}"
2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever
2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever
2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland
2022-05-04T15:52:28.875Z,0,3,batman,Bruce Wayne
2022-05-04T15:53:37.099Z,0,4,santa,Santa Claus