Convert MySQL table changes into events streams (Rust)
In this tutorial, we’re going to be talking about Change Data Capture (or CDC).
We’ll give a brief overview of what CDC is, then we’ll talk about the component
steps in a CDC setup. Then, we’ll take a look at how we can implement our own
CDC system using Fluvio.
Roughly speaking, CDC is a way to record updates to a database in a structured way,
so that those updates may be replayed on another database instance to create a
perfect copy. We typically refer to the database being copied as the “leader”, and
any database which is being replicated from it as a “follower”. Note that there can
be only one leader, but potentially many followers.
In this CDC tutorial, we’ll be using a Rust MySQL Binlog library to watch for
updates to our leader database. When we see an update, we’ll send it as a message
on a Fluvio stream. We’ll call the program that does this the “Producer”, since
it “produces” messages to the Fluvio topic. We’ll also create a second program
that watches for messages to appear on our Fluvio topic and then write those
messages to the binlog of the follower database. This program will be called the
“Consumer”, since it “consumes” messages from Fluvio.
To get the most out of this tutorial, be sure to clone our CDC demo repo and follow
along with the setup and commands.
Requirements
This tutorial requires you to have completed the following prerequisites:
Docker MySQL deployment
This example requires two instances of MySQL to be up and running: One to be a
leader that produces events, and the other to be a follower that consumes events.
We’ll use docker images to help set up these MySQL instances with the right
configurations.
Start a Docker container for Mysql Producer:
$ ./docker/install.sh -n mysql-producer -d ~/mysql-cdc/mysql-producer -p 3080
./docker/install.sh -n mysql-producer -d ~/mysql-cdc/mysql-producer -p 3080
✅ mkdir -p /Users/aj/mysql-cdc/mysql-producer - ok
✅ docker build . -t mysql-80 - ok
f110fc2c66ec599f95fd7b799dd1ea5a20a8abfa5772bee1505dc90af9f0bcb4
✅ docker mysql-producer - running
Start another container for Mysql Consumer:
$ ./docker/install.sh -n mysql-consumer -d ~/mysql-cdc/mysql-consumer -p 3090
✅ mkdir -p /Users/aj/mysql-cdc/mysql-consumer - ok
✅ docker build . -t mysql-80 - ok
af1c2fa69a06ee922f6f4302655a07721e3a01379e8c750f70e42c6b2e09f943
✅ docker mysql-consumer - running
At this point, you should be able to see both docker containers running. The leader
database is connected to port 3080
, and the follower is connected to 3090
on your
host machine.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
17c60cbbfc09 mysql-80 "docker-entrypoint.s…" 3 hours ago Up 3 hours 33060/tcp, 0.0.0.0:3090->3306/tcp mysql-consumer
f5ec27a58476 mysql-80 "docker-entrypoint.s…" 3 hours ago Up 3 hours 33060/tcp, 0.0.0.0:3080->3306/tcp mysql-producer
Creating a Fluvio Topic
In Fluvio, every message is sent to a Topic. A topic is a sort of category for events that
are related. For this example, we’re going to create a topic that will receive all of our
MySQL events. Run the following command to create the topic:
$ fluvio topic create rust-mysql-cdc
Start the Producer and Consumer
Now we’ll launch the CDC Producer, which will watch for any SQL commands that are executed
in the leader MySQL instance and produce corresponding Fluvio events.
cdc-mysql$ cargo run --bin cdc-producer -- ./producer_profile.toml
In another terminal window, we’ll launch the CDC Consumer, which listens for new Fluvio
events and replicates them in the follower MySQL instance.
cdc-mysql$ cargo run --bin cdc-consumer -- ./consumer_profile.toml
Connect to Mysql
Now you’re ready to start interacting with your databases. In two separate terminal
windows, we’ll open up the mysql
command line. In one of those windows, we’ll start
running queries on the leader database, and in the other window, we’ll connect to the
follower database to see that all the changes get propagated.
Connect to Producer (leader DB)
The leader database is bound to port 3080. You can connect to it using the mysql
command as follows:
$ mysql -h 0.0.0.0 -P 3080 -ufluvio -pfluvio4cdc!
...
mysql >
The producer profile has a filter that registers only changes applied to the “flvDb”
database. Let’s create the database now:
mysql> CREATE DATABASE flvDb;
Query OK, 1 row affected (0.01 sec)
mysql> use flvDb;
Database changed
Connect to Consumer (follower DB)
The follower database is bound to port 3090. In the second terminal window, connect
to the follower using the mysql
command:
$ mysql -h 0.0.0.0 -P 3090 -ufluvio -pfluvio4cdc!
...
mysql >
Since the CDC Consumer is already running, the “flvDb” database we just created on
the leader will also be created on the follower!
mysql> SHOW DATABASES;
+--------------------+
| Database |
+--------------------+
| flvDb |
| information_schema |
+--------------------+
2 rows in set (0.00 sec)
mysql> use flvDb;
Database changed
MYSQL Test Commands
In the producer terminal, generate mysql commands and see then propagated to the consumer database;
Producer
mysql> CREATE TABLE pet (name VARCHAR(20), owner VARCHAR(20), species VARCHAR(20), sex CHAR(1), birth DATE);
Query OK, 0 rows affected (0.03 sec)
mysql> show tables;
+-----------------+
| Tables_in_flbDb |
+-----------------+
| pet |
+-----------------+
1 row in set (0.00 sec)
Consumer
Create table has been propagated to the consumer:
mysql> show tables;
+-----------------+
| Tables_in_flbDb |
+-----------------+
| pet |
+-----------------+
1 row in set (0.00 sec)
Producer - Other Commands
Let’s add some rows and modify fields in pet
table:
INSERT INTO pet VALUES ('Puffball','Diane','hamster','f','1999-03-30');
INSERT INTO pet VALUES ('Jack','Peter','dog','m','1999-03-30');
UPDATE pet SET birth = '1989-08-31' WHERE name = 'Jack';
ALTER TABLE pet ADD COLUMN last_vaccine DATE;
DELETE from pet where name='Puffball';
INSERT INTO pet VALUES ('Spot', 'Jane', 'dog', 'm', '2010-11-2', Null);
UPDATE pet SET last_vaccine='2020-6-10' WHERE name='Spot';
mysql> select * from pet;
+------+-------+---------+------+------------+--------------+
| name | owner | species | sex | birth | last_vaccine |
+------+-------+---------+------+------------+--------------+
| Jack | Peter | dog | m | 1989-08-31 | NULL |
| Spot | Jane | dog | m | 2010-11-02 | 2020-06-10 |
+------+-------+---------+------+------------+--------------+
Consumer - Other Commands
The commands have been propagated to the consumer:
mysql> select * from pet;
+------+-------+---------+------+------------+--------------+
| name | owner | species | sex | birth | last_vaccine |
+------+-------+---------+------+------------+--------------+
| Jack | Peter | dog | m | 1989-08-31 | NULL |
| Spot | Jane | dog | m | 2010-11-02 | 2020-06-10 |
+------+-------+---------+------+------------+--------------+
Fluvio Events
CDC producer generates the events on the Fluvio topic as defined in the producer profile. By default the topic name is rust-mysql-cdc.
To view the events generated by the CDC producer, start run fluvio consumer command:
$ fluvio consume rust-mysql-cdc -B
{"uri":"flv://mysql-srv1/flvDb","sequence":0,"bn_file":{"fileName":"binlog.000003","offset":233},"operation":{"Query":"CREATE DATABASE flvDb"}}
{"uri":"flv://mysql-srv1/flvdb","sequence":1,"bn_file":{"fileName":"binlog.000003","offset":423},"operation":{"Query":"CREATE TABLE pet (name VARCHAR(20), owner VARCHAR(20), species VARCHAR(20), sex CHAR(1), birth DATE)"}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":2,"bn_file":{"fileName":"binlog.000003","offset":832},"columns":["name","owner","species","sex","birth"],"operation":{"Add":{"rows":[{"cols":[{"String":"Puffball"},{"String":"Diane"},{"String":"hamster"},{"String":"f"},{"Date":{"year":1999,"month":3,"day":30}}]}]}}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":3,"bn_file":{"fileName":"binlog.000003","offset":1146},"columns":["name","owner","species","sex","birth"],"operation":{"Add":{"rows":[{"cols":[{"String":"Jack"},{"String":"Peter"},{"String":"dog"},{"String":"m"},{"Date":{"year":1999,"month":3,"day":30}}]}]}}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":4,"bn_file":{"fileName":"binlog.000003","offset":1461},"columns":["name","owner","species","sex","birth"],"operation":{"Update":{"rows":[{"before_cols":[{"String":"Jack"},{"String":"Peter"},{"String":"dog"},{"String":"m"},{"Date":{"year":1999,"month":3,"day":30}}],"after_cols":[{"String":"Jack"},{"String":"Peter"},{"String":"dog"},{"String":"m"},{"Date":{"year":1989,"month":8,"day":31}}]}]}}}
{"uri":"flv://mysql-srv1/flvdb","sequence":5,"bn_file":{"fileName":"binlog.000003","offset":1647},"operation":{"Query":"ALTER TABLE pet ADD COLUMN last_vaccine DATE"}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":6,"bn_file":{"fileName":"binlog.000003","offset":2001},"columns":["name","owner","species","sex","birth","last_vaccine"],"operation":{"Delete":{"rows":[{"cols":[{"String":"Puffball"},{"String":"Diane"},{"String":"hamster"},{"String":"f"},{"Date":{"year":1999,"month":3,"day":30}},"Null"]}]}}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":7,"bn_file":{"fileName":"binlog.000003","offset":2316},"columns":["name","owner","species","sex","birth","last_vaccine"],"operation":{"Add":{"rows":[{"cols":[{"String":"Spot"},{"String":"Jane"},{"String":"dog"},{"String":"m"},{"Date":{"year":2010,"month":11,"day":2}},"Null"]}]}}}
{"uri":"flv://mysql-srv1/flvdb/pet","sequence":8,"bn_file":{"fileName":"binlog.000003","offset":2631},"columns":["name","owner","species","sex","birth","last_vaccine"],"operation":{"Update":{"rows":[{"before_cols":[{"String":"Spot"},{"String":"Jane"},{"String":"dog"},{"String":"m"},{"Date":{"year":2010,"month":11,"day":2}},"Null"],"after_cols":[{"String":"Spot"},{"String":"Jane"},{"String":"dog"},{"String":"m"},{"Date":{"year":2010,"month":11,"day":2}},{"Date":{"year":2020,"month":6,"day":10}}]}]}}}
Other MYSQL Commands
For additional mysql commands, checkout MYSQL-COMMANDS