Consume

The Consumer is responsible for reading messages from data streams in a Fluvio or a Kafka deployment. The messages are written to the topic/partitions by the Producers.

Consume Messages 

Consume command can operate in two modes:

  • read-one,
  • read-continuously.

Consume CLI command has the following operations:

fluvio consume [FLAGS] [OPTIONS] --partition <integer> --topic <string>

FLAGS:
    -g, --from-beginning      Start reading from this offset
    -C, --continuous          Read messages in a infinite loop
    -s, --suppress-unknown    Suppress items items that have an unknown output type

OPTIONS:
    -t, --topic <string>         Topic name
    -p, --partition <integer>    Partition id
    -b, --maxbytes <integer>     Maximum number of bytes to be retrieved
    -c, --sc <host:port>         Address of Streaming Controller
    -u, --spu <host:port>        Address of Streaming Processing Unit
    -k, --kf <host:port>         Address of Kafka Controller
    -P, --profile <profile>      Profile name
    -O, --output <type>          Output [possible values: dynamic, text, binary, json, raw]

The flags and options are defined as follows:

  • --from-beginning: is a flag that instructs the SPU to read from the beginning of the topic/partition. This is an optional flag; if blank the CLI will wait for the Producer to write to the data stream.

  • --continuous: is a flag that instructs the CLI to read from the data stream in an infinite loop. Press Ctrl-C, or send SIGINT, to exit loop.

  • --suppress-unknown: is a flag that instructs the CLI to skip messages that were not parsed correctly. Suppress-unknown is used with data streams that contain messages with mixed types where some messages cannot be successfully parsed. This is an optional flag.

  • --topic <string>: is the name of the topic from which to read the messages. The topic is a mandatory option and it is used in combination with --partition to uniquely identify a data stream.

  • --partition <integer>: is the partition index of a topic from which to read the messages. The partition is a mandatory option and it is used in combination with --topic to uniquely identify a data stream.

  • --maxbytes <integer>: is the maximum number of bytes of a message retrieved. The maxbytes field is optional.

  • --sc <host:port>: is the public interface of the Streaming Controller. The SC is optional and mutually exclusive with --spu and --kf. The SC is used in combination with CLI Profiles to compute a target service.

  • --spu <host:port>: is the public interface of the Streaming Processing Unit. The SPU is optional and mutually exclusive with --sc and --kf. The SPU is used in combination with CLI Profiles to compute a target service.

  • --kf <host:port>: is the public interface of the Kafka Controller. The KF is optional and mutually exclusive with --sc and --spu. The KF is used in combination with CLI Profiles to compute a target service.

  • --profile <profile>: is the custom-defined profile file. The profile is an optional field used to compute a target service. For additional information, see Target Service section.

  • --output <type>: is the format to be used to display the messages. The output is an optional field and it defaults to dynamic, where the parser will attempt to guess the message type. Known formats are: text, binary, json, and raw.

Consume Messages Examples 

Consume Messages from Fluvio SC

Consume all my-topic messages from the beginning of the Fluvio SC queue:

$ fluvio consume -t my-topic -p 0 --sc `SC`:9003 -g
hello world
test
hello World!
one 
two
three

Consume Messages for Kafka

Consume all kf-topic messages from the beginning of the Kafka queue:

$ fluvio consume -t kf-topic -p 0 --kf 0.0.0.0:9092 -g
Hello World
one
two
three
^C
Related Topics