State CLI SQL access
This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data.
Prerequisites
This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it.
Collect-sensor-data
1. Define the state
For this state, we will track the latitude, longitude, sensor_status and more parameters from the sensors.
    states:
      vehicle-data:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              latitude:
                type: f64
              longitude:
                type: f64
              fuel_consumption:
                type: u32
              sensor_status:
                type: string
              engine_temperature:
                type: i32
Here, the key is a string but the value is stored as an arrow-row which can contain multiple properties(acts like columns).
2. Assign key
We will access the id from the sensor data to partition the state.
partition:
  assign-key:
    run: |
        fn key_by_id(data: VehicleDataType) -> Result<String> {
          Ok(data.vehicle_id)
        }
  update-state:
    (...)
3. Updating State
To update the state in an arrow-row, we need to update the individual row's columns manual and call an update().
partition:
  assign-key:
    (...)
  update-state:
    run: |
        fn update_temperature(data: VehicleDataType) -> Result<()> {
            let mut vd = vehicle_data();
            vd.latitude = data.latitude;
            vd.longitude = data.longitude;
            vd.fuel_consumption = data.fuel_consumption;
            vd.engine_temperature = data.engine_temperature;
            vd.sensor_status = data.sensor_status;
            vd.update()?;
            Ok(())
          }
States are terminal so no other action will be run.
In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI.
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml.
apiVersion: 0.5.0
meta:
  name: sql-example
  version: 0.1.0
  namespace: examples
config:
  converter: json
types:
  vehicle-data-type:
    type: object
    properties:
      vehicle_id:
        type: string
      latitude:
        type: f64
      longitude:
        type: f64
      sensor_status:
        type: string
      fuel_consumption:
        type: u32
      engine_rpm:
        type: u32
      engine_temperature:
        type: i32
      speed:
        type: float32
topics:
  vehicle-sensor:
    schema:
      value:
        type: vehicle-data-type
services:
  collect-sensor-data:
    sources:
      - type: topic
        id: vehicle-sensor
    states:
      vehicle-data:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              latitude:
                type: f64
              longitude:
                type: f64
              fuel_consumption:
                type: u32
              sensor_status:
                type: string
              engine_temperature:
                type: i32
    partition:
      assign-key:
        run: |
          fn key_by_id(data: VehicleDataType) -> Result<String> {
            Ok(data.vehicle_id)
          }
      update-state:
        run: |
          fn update_temperature(data: VehicleDataType) -> Result<()> {
            let mut vd = vehicle_data();
            vd.latitude = data.latitude;
            vd.longitude = data.longitude;
            vd.fuel_consumption = data.fuel_consumption;
            vd.engine_temperature = data.engine_temperature;
            vd.sensor_status = data.sensor_status;
            vd.update()?;
            Ok(())
          }
Running SDF
To run example:
$ sdf run
Produce data
We will produce some data to mimic sensors behavior.
$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" }' | fluvio produce vehicle-sensor
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"}' | fluvio produce vehicle-sensor
Enter SQL Mode
In the SDF interactive shell use the sql command to enter the SQL Mode:
>> sql
SDF SQL version sdf-beta5
Type .help for help.
sql >> 
Run queries on the SQL mode
In the SQL mode we will be able to access the dataframe states of the dataflow.
We can list the tables available with:
sql >> show tables
shape: (1, 1)
┌──────────────┐
│ name         │
│ ---          │
│ str          │
╞══════════════╡
│ vehicle_data │
└──────────────┘
We can also perform normal sql queries:
select * from vehicle_data
shape: (2, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ ---  ┆ ---                ┆ ---              ┆ ---      ┆ ---       ┆ ---           │
│ str  ┆ i32                ┆ u32              ┆ f64      ┆ f64       ┆ str           │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V001 ┆ 90                 ┆ 10               ┆ 40.7128  ┆ -74.006   ┆ ok            │
│ V002 ┆ 85                 ┆ 8                ┆ 34.0522  ┆ -118.2437 ┆ failed        │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
sql >> select * from vehicle_data where sensor_status = 'failed'
shape: (1, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ ---  ┆ ---                ┆ ---              ┆ ---      ┆ ---       ┆ ---           │
│ str  ┆ i32                ┆ u32              ┆ f64      ┆ f64       ┆ str           │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V002 ┆ 85                 ┆ 8                ┆ 34.0522  ┆ -118.2437 ┆ failed        │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
Exit the SQL mode
Use .quit or .exit to exit the SQL mode.
sql >> .quit
Cleanup
Exit sdf terminal and clean-up. The --force flag removes the topics:
$ sdf clean --force
Conclusion
We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query