MQTT Source Connector
The MQTT connector Reads record from MQTT topic and writes to Fluvio topic. The connector supports MQTT V3.1.1 and V5. protocols.
Checkout the tutorial [MQTT to SQL Pipeline] for a running example.
Configuration
| Option | default | type | description | 
|---|---|---|---|
| timeout | 60s | Duration | mqtt broker connect timeout in seconds and nanoseconds | 
| url | - | SecretString | MQTT url which includes schema, domain, port and credentials such as username and password. | 
| topic | - | String | mqtt topic to subscribe and source events from | 
| client_id | UUID V4 | String | mqtt client ID. Using same client id in different connectors may close connection | 
| payload_output_type | binary | String | controls how the output of payloadfield is produced | 
url option with type SecretString can be set as raw string value:
url: "mqtt://test.mosquitto.org/"
or, as a reference to a secret with the given name:
url:
  secret:
    name: "URL_SECRET_NAME"
Record Type Output
JSON Serialized string with fields mqtt_topic and payload
Payload Output Type
| Value | Output | 
|---|---|
| binary | Array of bytes | 
| json | UTF-8 JSON Serialized String | 
Usage Example
All versions are marked with x.y.z. To find the latest version, run:
- fluvio hub connector list
- fluvio hub smartmodule list
This is an example of connector config file:
# sample-config.yaml
apiVersion: 0.1.0
meta:
  version: x.y.z
  name: my-mqtt-connector
  type: mqtt-source
  topic: mqtt-topic
mqtt:
  url: "mqtt://test.mosquitto.org/"
  topic: "mqtt-to-fluvio"
  timeout:
    secs: 30
    nanos: 0
  payload_output_type: json
Run connector locally using cdk tool (from root directory or any sub-directory):
cdk hub download infinyon/mqtt-source@x.y.z
cdk deploy start --config sample-config.yaml --ipkg infinyon/mqtt-source@x.y.z
cdk deploy list # to see the status
cdk deploy log my-mqtt-connector # to see connector's logs
Install MQTT Client such as
# for mac , this takes while....
brew install mosquitto
Insert records:
mosquitto_pub -h test.mosquitto.org -t mqtt-to-fluvio -m '{"device": {"device_id":1, "name":"device1"}}'
The produced record in Fluvio topic will be:
{
  "mqtt_topic": "mqtt-to-fluvio",
  "payload": {
    "device": {
      "device_id": 1,
      "name": "device1"
    }
  }
}
Transformations
Fluvio MQTT Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.
The previous example can be extended to add extra transformations to outgoing records:
# sample-config.yaml
apiVersion: 0.1.0
meta:
  version: x.y.z
  name: my-mqtt-connector
  type: mqtt-source
  topic: mqtt-topic
  
mqtt:
  url: "mqtt://test.mosquitto.org/"
  topic: "mqtt-to-fluvio"
  timeout:
    secs: 30
    nanos: 0
  payload_output_type: json
transforms:
  - uses: infinyon/jolt@x.y.z
    with:
      spec:
        - operation: shift
          spec:
            payload:
              device: "device"
        - operation: default
          spec:
            source: "mqtt-connector"
The object device in the resulting record will be "unwrapped" and the addition field source with value mqtt-connector will be added.
Read more about [JSON to JSON transformations].