In this code sample we will show how would you implement a Change Data Capture scenario with MySQL and a Hazelcast Jet cluster inside Docker environment with Docker Compose.
The Docker Compose file (hazelcast.yml
) contains following services:
mysql
- This image starts a MySQL server with example data and necessary configuration for Debezium.hazelcast-jet
- This image starts plain Hazelcast Jet Member.hazelcast-jet-submit
- This image submits a Jet job that was packaged as a self-contained JAR file to the Hazelcast Jet Cluster via CLI.
- Docker with Docker Compose: Installation guide
Docker service/daemon must be running for this application to work.
To build and package the application, run:
mvn clean package
After building the application, run the application with:
make up
This will start the services and the job. The job will listen for changes on the
configured inventory
database and logs the events as they arrive to the
standard out. The source for the job can be found here
After running the application, see the Hazelcast Jet logs with:
make tailServer
It should print logs similar to below after printing huge amount of logs regarding initial database snapshot:
....
hazelcast-jet_1 | 2020-01-07 16:26:31,106 INFO [Threads] [hz.elated_clarke.jet.blocking.thread-0] - Creating thread debezium-mysqlconnector-dbserver1-binlog-client
hazelcast-jet_1 | 2020-01-07 16:26:31,114 INFO [Threads] [blc-mysql:3306] - Creating thread debezium-mysqlconnector-dbserver1-binlog-client
hazelcast-jet_1 | [2020-01-07 16:26:31.269] [INFO ] com.github.shyiko.mysql.binlog.BinaryLogClient connect - Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:4)
hazelcast-jet_1 | 2020-01-07 16:26:31,272 INFO [BinlogReader] [blc-mysql:3306] - Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows
hazelcast-jet_1 | 2020-01-07 16:26:31,274 INFO [Threads] [blc-mysql:3306] - Creating thread debezium-mysqlconnector-dbserver1-binlog-client
Optionally if you would like to see the logs for other services use the following:
-
For MySQL Database
make tailDb
-
For Hazelast Jet Bootstrap Application
make tailClient
We will insert a new row to one of the tables that we are listening and see that a new event will be propagated by the Debezium source and logged in the Jet pipeline.
To make changes, log into the database by using the command below:
make connectDb
You should see the mysql command prompt:
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
You can run an example query to see the contents of the customers
table:
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
4 rows in set (0.01 sec)
Now, we can update a record in the table with following query:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Switching back to the Hazelcast Jet logs, we should see the update record like following:
make tailServer
...
...
...
hazelcast-jet_1 | 2020-01-07 16:32:19,871 INFO [loggerSink#0] [hz.elated_clarke.jet.blocking.thread-1] - [172.30.0.3]:5701 [cdc-demo] [4.0-SNAPSHOT]
({"id":1004},
{
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"version": "1.0.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1578414739000,
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"thread": 5,
"query": null
},
"op": "u",
"ts_ms": 1578414739610
})
We could also insert a record like following:
mysql> INSERT INTO customers (id, first_name, last_name, email) VALUES (1005, 'Jane', 'Doe', '[email protected]');
Query OK, 1 row affected (0.00 sec)
Running the above would yield a log line like below:
make tailServer
...
...
...
hazelcast-jet_1 | 2020-01-07 16:38:30,114 INFO [loggerSink#0] [hz.elated_clarke.jet.blocking.thread-1] - [172.30.0.3]:5701 [cdc-demo] [4.0-SNAPSHOT]
({"id":1005},
{
"before": null,
"after": {
"id": 1005,
"first_name": "Jane",
"last_name": "Doe",
"email": "[email protected]"
},
"source": {
"version": "1.0.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1578415109000,
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"thread": 5,
"query": null
},
"op": "c",
"ts_ms": 1578415109778
}
)
After you are done with the cluster, you can kill it with:
make down