Skip to main content

Building Real-time Data Synchronization with Lightweight CDC debezium-server-databend

Debezium Server Databend is a lightweight CDC project developed based on the Debezium Engine. It is designed to capture real-time database changes and transmit them as event streams, ultimately writing the data to Databend. It provides a simple way to monitor and capture changes in relational databases and supports transforming these changes into consumable events.

Using Debezium Server Databend for CDC does not require reliance on large-scale Data Infrastructure such as Flink, Kafka, Spark, and more. It only requires a startup script to enable real-time data synchronization.

This post will demonstrate how to quickly achieve real-time data synchronization from MySQL to Databend using Debezium Server Databend, assuming we have an e-commerce business with product data stored in MySQL.

Before You Start

Prepare a Linux or MacOS machine with Docker, docker-compose, and Java 11 already installed.

Preparing Required Components

The following sections will set up the necessary components using docker-compose.

debezium-MySQL

docker-compose.yaml
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  1. Clone the project:
git clone https://github.com/databendcloud/debezium-server-databend.git`
  1. Build and package the Debezium server from the project's root directory:
mvn -Passembly -Dmaven.test.skip package
  1. After the build is complete, unzip the server distribution package:
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
  1. Navigate to the extracted folder:
cd databendDist
  1. Create a file named application.properties in the folder nano conf and adjust the configuration according to your actual needs. For parameters and their explanations, see https://github.com/databendcloud/debezium-server-databend. See an example below:
application.properties
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
  1. Run the service using the provided script:
bash run.sh

We also provide a corresponding Docker image for easy containerized deployment:

version: '2.1'
services:
debezium:
image: ghcr.io/databendcloud/debezium-server-databend:pr-2
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/app/conf
- $PWD/data:/app/data

NOTE: When starting in a container, please pay attention to the network connectivity to the connected databases.

Preparing Data

Preparing Data for MySQL

To access the MySQL container, run the following command:

docker-compose exec mysql mysql -uroot -p123456

Once you are inside the MySQL container, you can create a database named mydb, a table named products, and insert data using the following commands:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

Creating Database in Databend

NOTE: You do not need to create tables in Databend beforehand. The system will automatically create tables when the need is detected.

CREATE DATABASE debezium

Running Debezium Server Databend

bash run.sh

The initial startup will enter the "init snapshot" mode, and the data from MySQL will be synchronized to Databend in full using the configured batch size. As a result, you will be able to see that the data from MySQL has been successfully synchronized to Databend.

Synchronizing INSERT Data

We continue to insert 5 records into MySQL:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend log:

You can see that 5 records have been successfully synchronized in Databend:

Synchronizing UPDATE Data

With the configuration debezium.sink.databend.upsert=true in the settings, we can also handle Update/Delete events.

In MySQL, if you update data with id=10 using the following SQL command:

update products set name="from debezium" where id=10;

You can see in Databend that the data with id 10 has been updated:

Synchronizing DELETE Data

In the configuration file, the following settings can be used to enable the handling of Delete events:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezium Server handles Delete operations in a complex way. Under a DELETE operation, it generates two event records:

  • One with "op": "d", along with other row data and fields.
  • A tombstones record with the same key as the deleted row, but with a null value.

Both of these events are emitted simultaneously. In Debezium Server Databend, we choose to perform a soft delete for Delete data. This requires us to have a __deleted field in the target table. When a Delete event arrives, we set this field to TRUE and insert it into the target table.

The advantage of this design is that it provides users with an optional solution. Users may want to retain the data for now but consider deleting it in the future. When they decide to delete this data in the future, they can simply run delete from table where __deleted=true.

For more information about Debezium's explanation and handling of delete events, please refer to this link.

In MySQL, if you delete data with id=12 using the following SQL command:

delete from products where id=12;

You can observe in Databend that the __deleted field for the id=12 value has been set to true.

Cleaning Up the Environment

After you have completed the operations, execute the following command in the directory where the docker-compose.yml file is located to stop all containers:

docker-compose down

Conclusion

The process described above covers the entire procedure for building real-time data synchronization from MySQL to Databend using the lightweight CDC Debezium Server Databend. This approach eliminates the need for relying on large components such as Flink and Kafka, making it easy to start and manage.