Fluvio stores the source code for its connectors in the fluvio-connectors repository. When a new connector is released, it is packaged in a Docker container and published on Dockerhub. In addition, the connector catalog list all available source or sink connectors.
At the moment, Fluvio has two official connectors:
Test source connector produces a new record every 1000ms to the topic of your choice. Use this connector to test the infrastructure and create your custom connectors.
MQTT source connector is a client implementation of an MQTT protocol, and it reads messages from an MQTT server and produces them to a fluvio topic.
Fluvio cluster offers a connector command-line interface (CLI) to start, stop and get the status of a container. A cluster may run many instances of the same or different connectors simultaneously. Fluvio manages the connector infrastructure through Kubernetes. If you run a local installation of Fluvio, make sure it runs
Fluvio uses configuration files to instantiate connectors. The configuration file has two sections:
- connector definition
- connector properties
connector definition section has the following parameters:
version: v1 name: unique_identifying_name type: official_connector_type # pick from connector catalog direction: source_or_sink topic: my_fluvio_topic create_topic: true # optional
connector properties are fields required by the external service.
parameters: connector_arg_key1: connector_arg_val1 connector_arg_key2: connector_arg_val2
For a list of parameters, check connector properties in the connector catalog.
Starting a connector is a two-step process: create the
configuration file, run the
create connector command.
configuration file for an mqtt connector:
version: latest name: my-test-mqtt type: mqtt direction: source topic: my-mqtt create_topic: true parameters: mqtt-url: "mqtt.hsl.fi" mqtt-topic: "/hfp/v2/journey/#" fluvio-topic: my-mqtt
In the future, Fluvio connector catalog will describe and verify the arguments before connector creation. For now you must ensure the configuration parameters are accurate:
versionis the version of the connector from connector catalog.
nameis the connector name as defined in the connector catalog.
typeis the unique identifier of the connector.
directiondefines if the connector is sink or source.
topicis the fluvio topic the connetor will publish to.
parametersis the list of parameters as defined by the connector.
mqtt-urldefines the mqtt server url.
mqtt-topicdefines the mqtt topic.
timeoutcontrols reconnection logic (optional).
qosmanages quality of service (optional).
For additional information, checkout mqtt connector in github.
configuration file for a test connector:
version: latest name: my-test-connector type: test-connector topic: my-test-connector create_topic: true direction: source parameters: topic: my-test-connector
connector definition section is defined above and not repeated here. The test connector specific parameters are:
timeoutinterval for sending records in miliseconds (default: 1000 miliseconds).
countthe number of records to produce (defaults to i64 max).
For additional information, checkout test connector in github.
To show a list of available connector commands, run the following CLI:
$ fluvio cluster connector -h
Use the following cli command to create a connector:
$ fluvio cluster connector create --config my-connector-config.yaml
create_topic is configured, a topic is created. If the topic already exists, the command is ignored. If
create_topic is not configured, and the topic does not exist, the connector returns an error.
Use the following cli command to retrieve the status of the connectors:
$ fluvio cluster connector list ------------- NAME STATUS my-test-connector Running my-test-mqtt Running
Use the following cli command to delete a connector:
$ fluvio cluster connector delete my-test-connector
Deleting a connector does not impact the topic. Hence, the records are available for reading after the connector is deleted. Recreating the same connector will resume publishing to the same topic.