Python Examples

To see the full docs, visit our pdoc page.

 

Example Workflow

Follow the installation instructions to run this example.

 

Prerequisites

Create the topic used to produce and consume records:

fluvio topic create python-data
 

Login

Login to Infinyon Cloud using username/password

from fluvio import cloud
cloud.login(email="my@email.com", password="mypassword")

You can also use the oauth method to log in. However this is only for interactive sessions.

from fluvio import cloud
cloud.login(Oauth2=true)
 

Producer

Create a file called python-produce.py:

#!/usr/bin/env python
from datetime import datetime
from fluvio import Fluvio

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
   # Connect to cluster
   fluvio = Fluvio.connect()

   # Produce 10 records to topic
   producer = fluvio.topic_producer(TOPIC_NAME)
   for x in range(10):
       producer.send_string("{}: timestamp: {}".format(x, datetime.now()))

   # Flush the last entry
   producer.flush()

Let’s run the file:

$ python python-produce.py
 

Consumer

Create a file called python-consume.py:

#!/usr/bin/env python
from fluvio import Fluvio, Offset

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
   # Connect to cluster
   fluvio = Fluvio.connect()

   # Consume last 10 records from topic
   consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
   for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
       print("{}".format(record.value_string()))
       
       if idx >= 9:
           break

Let’s run the file:

$ python python-consume.py
 

Limitations

  • Fluvio cluster administration is not supported.
  • Python async is not supported.
 

Example with a SmartModule

#!/usr/bin/env python
import os
from datetime import datetime
from fluvio import Fluvio, Offset, ConsumerCoonfig

TOPIC_NAME = "hello-python-smartmodule"
PARTITION = 0

# This is an example of a basic Fluvio workflow in Python
#
# 1. Create a topic to store data in via CLI
# 2. Establish a connection to the Fluvio cluster
# 3. Create a producer and send some bytes
# 4. Create a consumer, and stream the data back
if __name__ == "__main__":
   # Currently the Python client does not support creating topics
   # Using the fluvio CLI
   os.popen("fluvio topic create {}".format(TOPIC_NAME))

   # Connect to cluster
   fluvio = Fluvio.connect()

   # Produce to topic
   producer = fluvio.topic_producer(TOPIC_NAME)
   producer.send_string("Hello World! - Time is: {}".format(datetime.now()))

   # Consume from topic
   # We're just going to get the last record
   consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)


   # Create a ConsumerConfig using your "uppercase-map" smartmodule
   config = ConsumerConfig()
   config.smartmodule(name="uppercase-map")

   for record in consumer.stream_with_config(Offset.from_end(0), config):
       print("{}".format(record.value_string()))
       break
 

Links to Docs: