Blog

Building an Event-Driven CDC Pipeline: MySQL to Databend via AWS DMS and S3

avatarJeremyApr 28, 2026
Building an Event-Driven CDC Pipeline: MySQL to Databend via AWS DMS and S3

A step-by-step guide to building a near-real-time data pipeline from Amazon RDS MySQL / Aurora MySQL to Databend using AWS DMS and S3 as an intermediary. This approach delivers full-load migration plus ongoing CDC (Change Data Capture) with approximately one to two minutes of end-to-end latency.

Why This Approach?

Replicating production MySQL data into an analytical data Databend Cloud is a common requirement. This architecture keeps things simple and reliable:

  • Fully managed — AWS DMS handles binlog parsing and data transport. No self-hosted CDC infrastructure required.
  • Low latency — End-to-end delay of roughly 1–2 minutes.
  • Durable — The raw layer retains a complete change log for auditing and disaster recovery.
  • Easy to scale — Adding a new table is a repeatable, templated process with no architectural changes.

Architecture Overview

┌─────────────────────┐
│ RDS MySQL / Aurora │
│ (Source Database) │
└────────┬────────────┘
│ Binlog

┌─────────────────────┐
│ AWS DMS Task │
│ (Full Load + CDC) │
└────────┬────────────┘
│ Parquet files

┌──────────────────────┐
│ Amazon S3 │
│ (Staging area, │
│ organized by table)│
└────────┬─────────────┘
│ COPY INTO (scheduled task)

┌──────────────────────┐
│ Databend Cloud │
│ ┌───────────────┐ │
│ │ Raw Layer │ │ ← Append-only; retains every change event
│ │ (Change Log) │ │
│ └──────┬────────┘ │
│ │ Stream + MERGE INTO (event-driven)
│ ▼ │
│ ┌───────────────┐ │
│ │ ODS Layer │ │ ← Latest state; clean business tables
│ │ (Merged) │ │
│ └───────────────┘ │
└──────────────────────┘

Key design decisions:

  • Raw layer — Append-only, never updated. Acts as a complete binlog replay buffer for auditing and disaster recovery. Retain for at least 15 days before purging.
  • ODS layer — Reflects the current state of each record. Downstream queries hit this layer directly.
  • Event-driven merges — A Stream detects new rows in the raw layer and triggers MERGE INTO only when there is data to process, avoiding unnecessary empty runs.

Prerequisites

ComponentRequirement
Source databaseRDS MySQL 5.7+ / Aurora MySQL 2.x or 3.x with binlog enabled (binlog_format = ROW)
AWS DMSReplication instance in the same VPC / Region as the source database
S3 bucketDedicated bucket (or prefix) in the same Region as the source
IAM roleDMS service role with S3 read/write/delete permissions on the target bucket
Databend CloudAn active Warehouse with permissions to create Stages, Tasks, and Streams
NetworkDatabend Cloud can reach the S3 bucket (same-Region recommended to reduce cost and latency)

IAM Role Configuration for DMS

The DMS service role requires two policies: a trust policy (who can assume the role) and a permissions policy (what the role can do).

Trust policy (IAM → Role → Trust relationships):

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWSDMSS3BucketPolicyTemplate",
"Effect": "Allow",
"Principal": {
"Service": "dms.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "<your-aws-account-id>"
}
}
}
]
}

Permissions policy (IAM → Role → Permissions → Create inline policy):

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::<your-dms-bucket>/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<your-dms-bucket>"
}
]
}

Note: The two statements are separated because object-level actions (

PutObject
,
DeleteObject
,
GetObject
) require
bucket/*
in the Resource ARN, while bucket-level actions (
ListBucket
,
GetBucketLocation
) require just
bucket
(without
/*
). If
s3:DeleteObject
is missing, the DMS task will fail when it tries to clean the target S3 directory at the start of a full load.


Step 1: Create the S3 Bucket

Create a dedicated S3 bucket (or use a dedicated prefix in an existing bucket) for DMS output.

aws s3 mb s3://your-dms-bucket --region ap-southeast-1

Step 2: Configure AWS DMS

2.1 Create a Replication Instance

![image-20260423105043049](/Users/hanshanjie/Library/Application Support/typora-user-images/image-20260423105043049.png)

In the AWS DMS console:

  1. Navigate to DMS → Replication instances → Create replication instance.
  2. Configure:
    • Instance class:
      dms.r5.large
      or higher (choose based on table count and throughput)
    • VPC: Same VPC as the source RDS / Aurora instance
    • Multi-AZ: Recommended for production
    • Allocated storage: 100 GB+ (DMS uses local disk as a buffer during CDC)

2.2 Create the Source Endpoint (RDS MySQL / Aurora)

image-20260423105150721

  1. Navigate to DMS → Endpoints → Create endpoint.
  2. Select Source endpoint.
  3. Configure:
FieldValue
Endpoint typeSource
EngineMySQL (select this for both RDS MySQL and Aurora MySQL)
Server nameRDS / Aurora endpoint (e.g., mydb.cluster-xxxx.ap-southeast-1.rds.amazonaws.com)
Port3306
UsernameA dedicated DMS replication user (see below)
PasswordPassword for the replication user
SSL modeverify-ca recommended for production

Create a dedicated replication user on the source database:

CREATE USER 'dms_user'@'%' IDENTIFIED BY 'your_secure_password';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'dms_user'@'%';
-- Aurora requires an additional grant:
GRANT EXECUTE ON PROCEDURE mysql.rds_set_configuration TO 'dms_user'@'%';
FLUSH PRIVILEGES;

Set the binlog retention period (applies to both Aurora and RDS MySQL):

-- Retain binlogs for 24 hours (adjust as needed)
CALL mysql.rds_set_configuration('binlog retention hours', 24);

Verify by running

SHOW BINARY LOGS;
to confirm logs exist and are not being purged prematurely.

2.3 Create the Target Endpoint (S3)

image-20260423105234593

  1. Navigate to DMS → Endpoints → Create endpoint.
  2. Select Target endpoint.
  3. Configure:
FieldValue
Endpoint typeTarget
EngineAmazon S3
Service access role ARNARN of the IAM role with S3 write permissions
Bucket nameyour-dms-bucket
Bucket folderaurora/ (or your preferred prefix)
Data formatParquet

Extra connection attributes (paste into the "Extra connection attributes" field):

dataFormat=parquet;parquetVersion=PARQUET_2_0;timestampColumnName=_dms_ingestion_timestamp;includeOpForFullLoad=true;cdcInsertsAndUpdates=true;addColumnName=true;

Parameter reference:

ParameterValuePurpose
dataFormat
parquet
Output in Parquet format (good compression, self-describing schema)
parquetVersion
PARQUET_2_0
Use Parquet v2 for better type support
timestampColumnName
_dms_ingestion_timestamp
Adds an ingestion timestamp column to every row
includeOpForFullLoad
true
Adds an Op column (set to I ) to full-load rows, keeping the raw layer schema consistent
cdcInsertsAndUpdates
true
CDC output includes both INSERT and UPDATE operations
addColumnName
true
Embeds column names in Parquet metadata

2.4 Create the DMS Migration Task

image-20260423105339903

  1. Navigate to DMS → Database migration tasks → Create task.
  2. Configure:
FieldValue
Task identifieraurora-to-s3-full-cdc
Replication instanceThe instance created in 2.1
Source endpointThe MySQL source endpoint from 2.2
Target endpointThe S3 target endpoint from 2.3
Migration typeMigrate existing data and replicate ongoing changes (Full Load + CDC)
  1. Table mappings — Use the JSON editor to sync all tables in a given schema:
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "sync-all-tables",
"object-locator": {
"schema-name": "db",
"table-name": "%"
},
"rule-action": "include"
}
]
}

Replace

db
with your actual schema name.
%
matches all tables; you can also specify individual table names.

  1. Task settings — Full configuration (key parameters annotated below):
{
"TargetMetadata": {
"TargetSchema": "",
"SupportLobs": true,
"FullLobMode": false,
"LobChunkSize": 32,
"LimitedSizeLobMode": true,
"LobMaxSize": 32,
"InlineLobMaxSize": 0,
"LoadMaxFileSize": 0,
"ParallelLoadThreads": 0,
"ParallelLoadBufferSize": 0,
"BatchApplyEnabled": false,
"TaskRecoveryTableEnabled": false,
"ParallelLoadQueuesPerThread": 0,
"ParallelApplyThreads": 0,
"ParallelApplyBufferSize": 0,
"ParallelApplyQueuesPerThread": 0
},
"FullLoadSettings": {
"CreatePkAfterFullLoad": false,
"StopTaskCachedChangesApplied": false,
"StopTaskCachedChangesNotApplied": false,
"MaxFullLoadSubTasks": 8,
"TransactionConsistencyTimeout": 600,
"CommitRate": 10000
},
"Logging": {
"EnableLogging": true,
"EnableLogContext": false,
"LogComponents": [
{ "Id": "SOURCE_UNLOAD", "Severity": "LOGGER_SEVERITY_DEFAULT" },
{ "Id": "SOURCE_CAPTURE", "Severity": "LOGGER_SEVERITY_DEFAULT" },
{ "Id": "TARGET_LOAD", "Severity": "LOGGER_SEVERITY_DEFAULT" },
{ "Id": "TARGET_APPLY", "Severity": "LOGGER_SEVERITY_DEFAULT" },
{ "Id": "TASK_MANAGER", "Severity": "LOGGER_SEVERITY_DEFAULT" }
],
"CloudWatchLogGroup": null,
"CloudWatchLogStream": null
},
"ControlTablesSettings": {
"ControlSchema": "",
"HistoryTimeslotInMinutes": 5,
"HistoryTableEnabled": false,
"SuspendedTablesTableEnabled": false,
"StatusTableEnabled": false
},
"StreamBufferSettings": {
"StreamBufferCount": 3,
"StreamBufferSizeInMB": 8,
"CtrlStreamBufferSizeInMB": 5
},

"ChangeProcessingDdlHandlingPolicy": {
"HandleSourceTableDropped": true,
"HandleSourceTableTruncated": true,
"HandleSourceTableAltered": true
},
"ErrorBehavior": {
"DataErrorPolicy": "LOG_ERROR",
"DataTruncationErrorPolicy": "LOG_ERROR",
"DataErrorEscalationPolicy": "SUSPEND_TABLE",
"DataErrorEscalationCount": 0,
"TableErrorPolicy": "SUSPEND_TABLE",
"TableErrorEscalationPolicy": "STOP_TASK",
"TableErrorEscalationCount": 0,
"RecoverableErrorCount": -1,
"RecoverableErrorInterval": 5,
"RecoverableErrorThrottling": true,
"RecoverableErrorThrottlingMax": 1800,
"RecoverableErrorStopRetryAfterThrottlingMax": false,
"ApplyErrorDeletePolicy": "IGNORE_RECORD",
"ApplyErrorInsertPolicy": "LOG_ERROR",
"ApplyErrorUpdatePolicy": "LOG_ERROR",
"ApplyErrorEscalationPolicy": "LOG_ERROR",
"ApplyErrorEscalationCount": 0,
"ApplyErrorFailOnTruncationDdl": false,
"FullLoadIgnoreConflicts": true,
"FailOnTransactionConsistencyBreached": false,
"FailOnNoTablesCaptured": true
},
"ChangeProcessingTuning": {
"BatchApplyPreserveTransaction": true,
"BatchApplyTimeoutMin": 1,
"BatchApplyTimeoutMax": 30,
"BatchApplyMemoryLimit": 500,
"BatchSplitSize": 0,
"MinTransactionSize": 1000,
"CommitTimeout": 1,
"MemoryLimitTotal": 1024,
"MemoryKeepTime": 60,
"StatementCacheSize": 50
},
"ValidationSettings": {
"EnableValidation": false,
"ValidationMode": "ROW_LEVEL",
"ThreadCount": 5,
"FailureMaxCount": 10000,
"TableFailureMaxCount": 1000,
"HandleCollationDiff": false,
"ValidationOnly": false,
"RecordFailureDelayLimitInMinutes": 0,
"SkipLobColumns": false,
"ValidationPartialLobSize": 0,
"ValidationQueryCdcDelaySeconds": 0,
"PartitionSize": 10000
},
"PostProcessingRules": null,
"CharacterSetSettings": null,
"LoopbackPreventionSettings": null,
"BeforeImageSettings": null,
"FailTaskWhenCleanTaskResourceFailed": false
}

Key settings explained:

SettingValueWhy it matters
BatchApplyEnabled
false
Critical. Must be false so DMS writes a per-row Op column (I/U/D) into the Parquet files. Setting this to true causes the Op column to be missing or incorrect.
EnableLogging
true
Enables CloudWatch logging for troubleshooting
MaxFullLoadSubTasks
8
Number of parallel threads for the full load phase. Increase for large table counts.
HandleSourceTableAltered
true
Automatically handles source-side DDL changes (add/drop columns, etc.)
TableErrorPolicy
SUSPEND_TABLE
On error, only the affected table is suspended — the rest of the task continues
RecoverableErrorCount
-1
Retries recoverable errors (network blips, throttling) indefinitely
FullLoadIgnoreConflicts
true
Skips duplicate-key errors during full load (safe for S3 targets)
LimitedSizeLobMode
true
Truncates LOBs to 32 KB for better performance. Increase LobMaxSize if you have large TEXT/BLOB columns.
  1. In the S3 target endpoint's extra connection attributes, add CDC tuning parameters:
ParameterValuePurpose
cdcMinFileSize
32 (KB)Lowers the threshold so smaller files are flushed to S3 sooner
cdcMaxBatchInterval
30 (seconds)Maximum interval before DMS flushes a CDC file to S3

DMS CDC end-to-end latency is typically 30–60 seconds. This is a DMS design trade-off, not a Databend bottleneck. Adjust these parameters based on your real-time requirements.

  1. Click Create task. The task will automatically begin the full load and switch to CDC mode once complete.

2.5 S3 Directory Structure

DMS automatically organizes files by schema and table name:

s3://dms-databend-bucket/
├── databend/
│ ├── orders/
│ │ ├── 20260416-124049492.parquet ← Full-load file
│ │ └── 20260416-124049752.parquet
│ ├── table_b/
│ │ ├── 20260416-xxxxxxxxx.parquet
│ │ └── ...
│ └── ...

Columns automatically added by DMS in CDC Parquet files:

ColumnTypeDescription
Op
STRINGOperation type: I (Insert), U (Update), D (Delete)
_dms_ingestion_timestamp
TIMESTAMPThe time DMS captured the change
(all source columns)(original types)Your business data columns

Step 3: Configure Databend Cloud

3.1 Create an External Stage Pointing to S3

-- Option A: Using Access Key / Secret Key
CREATE STAGE IF NOT EXISTS dms_s3_stage
URL = 's3://dms-databend-bucket/databend/'
CONNECTION = (
AWS_KEY_ID = '<your-access-key>',
AWS_SECRET_KEY = '<your-secret-key>',
REGION = 'ap-southeast-1'
);

-- Option B: Using an IAM Role ARN (recommended for production)
CREATE STAGE IF NOT EXISTS dms_s3_stage
URL = 's3://dms-databend-bucket/databend/'
CONNECTION = (
ROLE_ARN = 'arn:aws:iam::123456789012:role/databend-s3-access',
REGION = 'ap-southeast-1'
);

Verify the Stage can list files:

验证Stage

LIST @dms_s3_stage;

3.2 Create the Raw Layer Tables

The raw layer stores every change event from the DMS Parquet files as-is. Each source table maps to one raw table.

Design principles:

  • Retain all original business columns plus DMS metadata columns (
    Op
    ,
    _dms_ingestion_timestamp
    )
  • Enable
    ENABLE_SCHEMA_EVOLUTION
    so new columns added at the source are picked up automatically
  • This layer is append-only — never update or delete rows here

Example: source table

orders

CREATE DATABASE IF NOT EXISTS raw;

CREATE TABLE IF NOT EXISTS raw.orders (
-- DMS metadata columns
Op VARCHAR NULL,
_dms_ingestion_timestamp TIMESTAMP NULL,

-- Business columns (match the source table schema)
order_id BIGINT NULL,
user_id BIGINT NULL,
product_id BIGINT NULL,
quantity INT NULL,
amount DECIMAL(12,2) NULL,
status VARCHAR NULL,
created_at TIMESTAMP NULL,
updated_at TIMESTAMP NULL
) ENABLE_SCHEMA_EVOLUTION = true;

Repeat this step for each source table. Column names must exactly match the column names in the Parquet files.

3.3 Create the ODS Layer Tables

The ODS layer holds the latest state of each record — this is the clean business table that downstream queries should target.

CREATE DATABASE IF NOT EXISTS ods;

CREATE TABLE IF NOT EXISTS ods.orders (
order_id BIGINT NOT NULL,
user_id BIGINT NULL,
product_id BIGINT NULL,
quantity INT NULL,
amount DECIMAL(12,2) NULL,
status VARCHAR NULL,
is_deleted BOOLEAN NOT NULL DEFAULT false, -- Soft-delete flag
created_at TIMESTAMP NULL,
updated_at TIMESTAMP NULL
);

Use soft deletes (

is_deleted
flag) rather than physical DELETEs. This preserves history and prevents data loss when a
D
operation arrives from the binlog.


Step 4: Initial Full Load

Full-load files land at

s3://dms-databend-bucket/databend/orders/
. Load them into the raw layer first, then merge into the ODS layer.

4.1 Load Full-Load Files into the Raw Layer

COPY INTO raw.orders
FROM @dms_s3_stage/orders/
PATTERN = '.*\.parquet'
FILE_FORMAT = (TYPE = PARQUET)
PURGE = TRUE;

4.2 Initial Merge into the ODS Layer

During the full load, every row has

Op = 'I'
, so a simple INSERT is sufficient:

全量加载结果

INSERT INTO ods.orders
SELECT
order_id, user_id, product_id, quantity, amount, status,
false AS is_deleted,
created_at, updated_at
FROM raw.orders
WHERE Op = 'I';

Once the full load is complete and the ODS layer is populated, proceed to Step 5 to set up the CDC pipeline.


Step 5: Set Up the CDC Incremental Pipeline

5.1 Create a Stream on the Raw Layer Table

A Stream tracks newly appended rows in the raw table. After a MERGE INTO task consumes the Stream, the cursor advances automatically.

CREATE STREAM IF NOT EXISTS raw.stream_orders
ON TABLE raw.orders
APPEND_ONLY = TRUE;

5.2 Create Task 1 — COPY INTO the Raw Layer (Scheduled)

This task polls S3 on a schedule for new CDC Parquet files and loads them into the raw table. You can group multiple tables from the same business domain into a single task.

-- Single-table task
CREATE TASK load_orders_raw
WAREHOUSE = 'default'
SCHEDULE = 1 MINUTE
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
COPY INTO raw.orders
FROM @dms_s3_stage/orders/
PATTERN = '.*\.parquet'
FILE_FORMAT = (TYPE = PARQUET)
PURGE = TRUE
ON_ERROR = CONTINUE;

Multi-table task example (grouped by business domain):

CREATE TASK load_databend_domain_raw
WAREHOUSE = 'default'
SCHEDULE = 1 MINUTE
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
COPY INTO raw.orders
FROM @dms_s3_stage/orders/
PATTERN = '.*\.parquet'
FILE_FORMAT = (TYPE = PARQUET)
PURGE = TRUE
ON_ERROR = CONTINUE;

-- Add more tables from the same domain here ...
END;

PURGE = TRUE
deletes S3 files after a successful load, preventing duplicate ingestion.

5.3 Create Task 2 — MERGE INTO the ODS Layer (Event-Driven)

This task fires after Task 1 completes and only runs when the Stream contains new data.

Important: the QUALIFY ROW_NUMBER() = 1 deduplication logic. Within a single CDC batch, the same primary key may appear multiple times (e.g., two consecutive UPDATEs). You must keep only the latest change per key; otherwise the MERGE result is non-deterministic.

CREATE TASK merge_orders
WAREHOUSE = 'default'
AFTER 'load_orders_raw'
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
MERGE INTO ods.orders AS target
USING (
SELECT * FROM (
SELECT
order_id,
user_id,
product_id,
quantity,
amount,
status,
created_at,
updated_at,
Op AS _op,
_dms_ingestion_timestamp AS _ts
FROM raw.stream_orders
)
QUALIFY ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY _ts DESC
) = 1
) AS source
ON target.order_id = source.order_id

WHEN MATCHED AND source._op = 'D' THEN
UPDATE SET target.is_deleted = true, target.updated_at = source.updated_at

WHEN MATCHED AND source._op IN ('U', 'I') THEN
UPDATE SET
target.user_id = source.user_id,
target.product_id = source.product_id,
target.quantity = source.quantity,
target.amount = source.amount,
target.status = source.status,
target.is_deleted = false,
target.created_at = source.created_at,
target.updated_at = source.updated_at

WHEN NOT MATCHED AND source._op != 'D' THEN
INSERT (order_id, user_id, product_id, quantity, amount, status, is_deleted, created_at, updated_at)
VALUES (source.order_id, source.user_id, source.product_id, source.quantity,
source.amount, source.status, false, source.created_at, source.updated_at);

MERGE logic breakdown:

ConditionActionScenario
MATCHED + Op='D'
Soft delete (is_deleted = true)The source row was deleted
MATCHED + Op='U'/'I'
Update all columns, reset is_deleted = falseThe source row was updated (or re-inserted after a delete)
NOT MATCHED + Op!='D'
Insert a new rowA new row that doesn't yet exist in the ODS layer

5.4 Start the Tasks

Start the child task first, then the parent task. This ensures the downstream task is ready before the upstream task begins producing data.

-- Step 1: Start the child task (MERGE)
ALTER TASK merge_orders RESUME;

-- Step 2: Start the parent task (COPY INTO)
ALTER TASK load_orders_raw RESUME;

To add a new table, repeat steps 3.2 → 3.3 → 5.1 → 5.2 → 5.3 → 5.4.


Step 6: Verification

6.1 Verify the DMS Task Status

In the AWS DMS console, check that:

  • Task status is Running (after the full load completes, it should show Load complete, replication ongoing)
  • The Table statistics tab shows loaded row counts and CDC change counts
  • CloudWatch logs contain no errors

image-20260423105532867

# CLI check
aws dms describe-table-statistics \
--replication-task-arn <your-task-arn> \
--query 'TableStatistics[*].{Table:TableName,Inserts:Inserts,Updates:Updates,Deletes:Deletes,FullLoad:FullLoadRows}'

6.2 Verify S3 Files

# Confirm files are landing
aws s3 ls s3://dms-databend-bucket/databend/orders/ --recursive

6.3 Verify Databend Cloud Tasks

-- Check task status
SHOW TASKS;

-- View task execution history
SELECT *
FROM TABLE(TASK_HISTORY())
ORDER BY scheduled_time DESC
LIMIT 20;

-- Verify raw layer data
SELECT COUNT(*) FROM raw.orders;
SELECT * FROM raw.orders ORDER BY _dms_ingestion_timestamp DESC LIMIT 10;

-- Verify ODS layer data
SELECT COUNT(*) FROM ods.orders WHERE is_deleted = false;
SELECT * FROM ods.orders ORDER BY updated_at DESC LIMIT 10;

-- Check Stream status (should be empty after a successful merge)
SELECT COUNT(*) FROM raw.stream_orders;

6.4 End-to-End Validation

Run the following test operations on the source database in sequence. Wait 2–3 minutes between each step to allow DMS to flush to S3 and the Databend tasks to consume the data.

Step 1: INSERT — Run on the source RDS MySQL / Aurora

INSERT INTO db.orders (user_id, product_id, quantity, amount, status, created_at, updated_at)
VALUES (99999, 88888, 3, 4.4, 'pending', NOW(), NOW());

-- Note the auto-generated order_id for subsequent steps
SELECT LAST_INSERT_ID();

Verify on Databend Cloud (wait 2–3 minutes):

INSERT验证结果

SELECT * FROM ods.orders WHERE order_id = 100000004;
-- Expected: status = 'pending', amount = 4.4, is_deleted = false

Step 2: UPDATE — Run on the source RDS MySQL / Aurora

UPDATE db.orders
SET status = 'paid', amount = 2.2, updated_at = NOW()
WHERE order_id = 100000004;

Verify on DatabendCloud (wait 2–3 minutes):

SELECT * FROM ods.orders WHERE order_id = 100000004;
-- Expected: status = 'paid', amount = 2.2, is_deleted = false

Step 3: DELETE — Run on the source RDS MySQL / Aurora

DELETE FROM db.orders WHERE order_id = 100000004;

Verify on Databend Cloud (wait 2–3 minutes):

SELECT * FROM ods.orders WHERE order_id = 100000004;
-- Expected: is_deleted = true

Clean up test data:

DELETE FROM ods.orders WHERE order_id = 100000004;
DELETE FROM raw.orders WHERE order_id = 100000004;

Step 7: Operations and Maintenance

7.1 Common Operations

-- Suspend tasks (parent first, then child)
ALTER TASK load_orders_raw SUSPEND;
ALTER TASK merge_orders SUSPEND;

-- Resume tasks (child first, then parent)
ALTER TASK merge_orders RESUME;
ALTER TASK load_orders_raw RESUME;

-- Manually trigger a task (useful for debugging)
EXECUTE TASK load_orders_raw;

-- Inspect unconsumed Stream data
SELECT * FROM raw.stream_orders LIMIT 10;

7.2 Raw Layer Data Retention

The raw layer grows continuously. Set up a scheduled cleanup task (retain at least 15 days):

-- Manual cleanup
DELETE FROM raw.orders
WHERE _dms_ingestion_timestamp < DATEADD(DAY, -15, NOW());

-- Or create a scheduled cleanup task
CREATE TASK cleanup_raw_orders
WAREHOUSE = 'default'
SCHEDULE = USING CRON '0 3 * * *' -- Runs daily at 3:00 AM
AS
DELETE FROM raw.orders
WHERE _dms_ingestion_timestamp < DATEADD(DAY, -15, NOW());

ALTER TASK cleanup_raw_orders RESUME;

Troubleshooting

ProblemSymptomSolution
Missing
QUALIFY ROW_NUMBER()
dedup
Dirty or inconsistent data in the ODS layerThe USING subquery in MERGE must deduplicate by primary key, keeping only the latest change per key
Physical DELETE instead of soft deleteHistorical data lost, no audit trailUse an is_deleted flag in the ODS layer; set the flag to true when
Op='D'
instead of deleting the row
Skipping the raw layer and writing directly to ODSNo way to replay or recover from a bad mergeAlways keep the raw layer as the source of truth. Retain for at least 15 days.
DDL changes not propagatedCOPY INTO fails to parse new Parquet columnsEnable ENABLE_SCHEMA_EVOLUTION = true on raw tables; manually add new columns to ODS tables
Timezone mismatchupdated_at values differ between source and DatabendDMS defaults to UTC. Ensure the source database also uses UTC, or handle timezone conversion explicitly.
Character encoding issuesGarbled text in DatabendEnsure the source database uses UTF-8. Parquet format handles encoding automatically in DMS.
BatchApplyEnabled = true
Op column missing or incorrect in CDC filesSet BatchApplyEnabled to false in the DMS task settings
S3 files not being cleaned upDuplicate data in the raw layerEnsure PURGE = TRUE is set in COPY INTO; verify the IAM role has s3:DeleteObject permission
Full-load and CDC paths overlapFull-load files reprocessed as CDCUse cdcPath=cdc/ in the S3 endpoint settings to separate full-load and CDC directories

Appendix A: Adding a New Table — Checklist

For each new source table you want to sync, repeat the following steps:

StepWhat to customize
3.2 — Raw tableColumn names and types to match the source table
3.3 — ODS tableColumn names and types to match the source table, plus is_deleted
4.1 — Full-load COPY INTOS3 path to match the table's directory
5.1 — StreamStream name and target raw table
5.2 — Task 1 (COPY INTO)S3 path and raw table name
5.3 — Task 2 (MERGE INTO)All column references, the ON clause, and the PARTITION BY primary key
5.4 — Start tasksTask names

Appendix B: DMS Type Mapping to Databend Cloud

MySQL TypeDMS Parquet TypeDatabend Type
TINYINTINT32TINYINT
SMALLINTINT32SMALLINT
INTINT32INT
BIGINTINT64BIGINT
FLOATFLOATFLOAT
DOUBLEDOUBLEDOUBLE
DECIMAL(p,s)FIXED_LEN_BYTE_ARRAYDECIMAL(p,s)
VARCHAR(n)BYTE_ARRAY (UTF8)VARCHAR
TEXTBYTE_ARRAY (UTF8)VARCHAR
DATEINT32 (DATE)DATE
DATETIMEINT64 (TIMESTAMP_MILLIS)TIMESTAMP
TIMESTAMPINT64 (TIMESTAMP_MILLIS)TIMESTAMP
BOOLEAN / TINYINT(1)BOOLEANBOOLEAN
JSONBYTE_ARRAY (UTF8)VARIANT
BLOBBYTE_ARRAYVARCHAR (base64)

Appendix C: Latency Estimates by Stage

StageTypical Latency
Source database → DMS (binlog read)1–5 seconds
DMS → S3 (CDC file flush)30–60 seconds (controlled by cdcMaxBatchInterval)
S3 → Databend raw layer (COPY INTO task)1 minute (task schedule interval, adjustable)
Raw → ODS (MERGE INTO task)Triggers immediately after COPY INTO completes
End-to-end~1–2 minutes

The primary source of latency is the DMS CDC batch interval. To reduce latency, lower cdcMaxBatchInterval (minimum 60 seconds) and cdcMinFileSize.


Conclusion

This architecture combines AWS DMS, S3, and Databend to deliver near-real-time replication from MySQL/Aurora to a cloud data warehouse. The core design principles are:

  1. Layered storage — The raw layer retains a complete change log; the ODS layer maintains the latest state.
  2. Event-driven processing — Streams avoid unnecessary empty runs by triggering merges only when new data arrives.
  3. Idempotent design — PURGE = TRUE prevents duplicate ingestion; QUALIFY ROW_NUMBER() guarantees deduplication.
  4. Soft deletes — Preserve data integrity and support auditing and historical lookups.

Once the pipeline is in place, onboarding a new table is a simple, repeatable process with minimal operational overhead.

A 1-Minute Latency CDC Pipeline from RDS MySQL to Databend Cloud

Get started in minutes with Databend Cloud—the agent-ready data warehouse for analytics, search, AI, and Python Sandbox—and receive $200 in free credits.

Share this post

Subscribe to our newsletter

Stay informed on feature releases, product roadmap, support, and cloud offerings!