Recently, while using SeaTunnel CDC to synchronize real-time data from Oracle, MySQL, and SQL Server to other relational databases, I spent time reading and modifying the source code of SeaTunnel and Debezium. Through this process, I gained an initial understanding of how the SeaTunnel CDC Source is implemented.
While everything was still fresh, I decided to organize some of the questions I encountered and address common confusions. I will try to explain things in a more approachable way. These are purely my personal understandings—if anything is incorrect, I welcome corrections.
The main topics covered in this article are:
The overall CDC data reading process can be divided into three phases:
Snapshot (full) → Backfill → Incremental
As the name implies, the snapshot phase captures a snapshot of the current state of the database and reads all existing data. In SeaTunnel’s current implementation, this is done through pure JDBC reads.
During snapshot reading, SeaTunnel records the current binlog position. For MySQL, it executes:
SHOW MASTER STATUS;
which returns results such as:
File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set -------------+-----------+--------------+------------------+------------------ binlog.000011|1001373553 | | |
These values are stored as the low watermark.
Note that this operation is not performed only once.
To improve performance, SeaTunnel has designed its own split mechanism. You can refer to my other article for details. Assume the global parallelism is 10. SeaTunnel initializes 10 channels to execute tasks in parallel.
SeaTunnel first analyzes the number of tables, then splits each table based on the minimum and maximum values of the primary key. The default split size is 8096 rows.
For tables with large data volumes, this can result in more than 100 splits, which are randomly distributed across the 10 channels. At this stage, no data is actually read; SeaTunnel only prepares the SQL queries with WHERE conditions and stores them.
After all tables are split, each split is executed in parallel.
When each split (for example, \n SELECT * FROM user_orders WHERE order_id >= 1 AND order_id < 10001) begins execution:
SHOW MASTER STATUS again.Once one split finishes, the next split begins execution.
The corresponding code is shown below:
// MySqlSnapshotSplitReadTask.doExecute() protected SnapshotResult doExecute(...) { // ① Record low watermark BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(..., lowWatermark, WatermarkKind.LOW); // ② Read snapshot data createDataEvents(ctx, snapshotSplit.getTableId()); // ③ Record high watermark BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(..., highWatermark, WatermarkKind.HIGH); }
Notes:
It is recommended to configure a larger split size, such as 100,000 rows. Practical experience shows that more splits do not necessarily lead to better performance.
The backfill phase has two modes, controlled by the exactly_once parameter.
exactly_once = false (default)If exactly_once is disabled, SeaTunnel waits until all snapshot splits are completed. It then compares the watermarks of all splits and selects the minimum watermark.
From that point onward, SeaTunnel switches from JDBC reads to CDC log consumption:
Log entries are parsed, and corresponding INSERT, UPDATE, or DELETE events are generated.
Each emitted record carries its own position or SCN offset. For each incoming record, SeaTunnel compares its offset with the high watermark. Once the offset exceeds the high watermark, the system transitions into the pure incremental phase.
exactly_once = trueWhen exactly_once is enabled:
All change events within this range are parsed and cached.
Snapshot data and log data are then merged in memory. Records are compared by primary key. For example, if a row is inserted during snapshot and later updated during log reading, only the updated version is retained.
This guarantees exactly-once semantics, but it is very memory-intensive.
The incremental phase consists of pure log consumption.
If exactly_once is enabled, SeaTunnel starts a new unbounded stream beginning from the high watermark. If it is disabled, incremental reading continues directly from the backfill phase.
Conceptually, backfill reads from the low watermark, while incremental reading starts from the high watermark. The only difference lies in the starting offset.
exactly_once enabled (Exactly-Once)Costs:
Note:
The machanism of LogMiner
LogMiner is an internal Oracle process running inside the production database instance.
Each LogMiner session requires:
~1 CPU core for parsing Redo Logs
~500MB–1GB memory for caching and parsing
Continuous reading of Redo Log files (I/O operations)
Without enabling exactly_once (semantic: At-Least-Once)
Snapshot: still reading historical data
Incremental: directly consume binlog from the low watermark (no separate backfill)
Differences:
No separate "backfill" step
Snapshot and incremental are in the same stream, but not mixed:
Backfill and incremental are linked in the same stream, which can be considered as "backfill + incremental as one".
exactly_once is not enabled?Yes, under certain conditions:
exactly_once?upsert on the Sink (primary key idempotent)enable_upsert → uses statements like MERGE INTO / REPLACE INTOINSERTexactly_once + in-memory filtering:
exactly_once, enable Sink upsert, deduplicate by primary keyexactly_once, but evaluate memory and source DB pressure(especially Oracle LogMiner scenarios)startup.modetimestamp implemented internally?The timestamp mode specifies a point in time to sync data. Each database’s CDC mechanism differs, so the way to specify the timestamp is also different.
MySQL principle:
1734494400000)SHOW BINARY LOGS to get all binlog filesOracle principle:
1763058616003)java.sql.Timestamp, formatted as YYYY-MM-DD HH24:MI:SS.FF3SELECT TIMESTAMP_TO_SCN(TO_TIMESTAMP('2024-12-18 09:00:00.003', 'YYYY-MM-DD HH24:MI:SS.FF3')) FROM DUALTIMESTAMP_TO_SCN returns corresponding SCN (System Change Number)RedoLogOffset containing that SCN, reads redo log from that SCNSELECT current_scn FROM v$database; and SELECT SCN_TO_TIMESTAMP('240158979') FROM DUAL;Additionally, since Oracle reads redo logs directly, troubleshooting is difficult. The following SQL simulates Debezium starting a LogMiner session, useful for problem diagnosis:
-- Clean previous LogMiner session BEGIN DBMS_LOGMNR.END_LOGMNR; EXCEPTION WHEN OTHERS THEN NULL; END; SELECT * FROM V$LOGFILE ; -- Add current online log files DECLARE v_first BOOLEAN := TRUE; BEGIN FOR rec IN (SELECT MEMBER FROM V$LOGFILE WHERE TYPE='ONLINE' AND ROWNUM <= 3) LOOP IF v_first THEN DBMS_LOGMNR.ADD_LOGFILE(rec.MEMBER, DBMS_LOGMNR.NEW); DBMS_OUTPUT.PUT_LINE('Added log file: ' || rec.MEMBER); v_first := FALSE; ELSE DBMS_LOGMNR.ADD_LOGFILE(rec.MEMBER, DBMS_LOGMNR.ADDFILE); DBMS_OUTPUT.PUT_LINE('Added log file: ' || rec.MEMBER); END IF; END LOOP; END; -- Start LogMiner session BEGIN DBMS_LOGMNR.START_LOGMNR( OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.COMMITTED_DATA_ONLY ); DBMS_OUTPUT.PUT_LINE('LogMiner started successfully'); END; -- Query parsed content SELECT SCN, OPERATION, OPERATION_CODE, TABLE_NAME, TO_CHAR(TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS') AS TIMESTAMP, CSF, INFO, SUBSTR(SQL_REDO, 1, 200) AS SQL_REDO_PREVIEW FROM V$LOGMNR_CONTENTS WHERE TABLE_NAME = 'XML_DEBUG_TEST' AND SEG_OWNER = USER ORDER BY SCN, SEQUENCE#; -- Clean LogMiner session BEGIN DBMS_LOGMNR.END_LOGMNR; EXCEPTION WHEN OTHERS THEN NULL; END;
PostgreSQL principle:
INITIAL, EARLIEST, LATESTmodessys.fn_cdc_map_time_to_lsn FunctionSQL Server principle:
User specifies millisecond timestamp (e.g., 1734494400000)
SeaTunnel converts to java.sql.Timestamp
Executes SQL: SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', ?) AS lsn
SQL Server built-in function returns the smallest LSN >= specified time
Returns LsnOffset containing byte array for that LSN, reads CDC log from that LSN
Checkpoint enables resume-from-failure. How does it work, and what should be noted?
First, understand the CK implementation: SeaTunnel triggers a checkpoint asynchronously at intervals: SourceFlowLifeCycle.triggerBarrier()
// SourceFlowLifeCycle.triggerBarrier() public void triggerBarrier(Barrier barrier) throws Exception { log.debug("source trigger barrier [{}]", barrier); // Key: acquire checkpoint lock to ensure state consistency synchronized (collector.getCheckpointLock()) { // Step 1: check if prepare to close if (barrier.prepareClose(this.currentTaskLocation)) { this.prepareClose = true; } // Step 2: snapshot state if (barrier.snapshot()) { List<byte[]> states = serializeStates(splitSerializer, reader.snapshotState(barrier.getId())); runningTask.addState(barrier, ActionStateKey.of(sourceAction), states); } // Step 3: acknowledge barrier runningTask.ack(barrier); // Step 4: Key! send barrier as Record downstream collector.sendRecordToNext(new Record<>(barrier)); } }
The checkpoint simulates a barrier record, a special marker that passes through iterators along the data flow: source → transform → sink. At each stage, the Barrier is evaluated:
Source stops reading and stores the state in CK
Transform passes Barrier without processing
Sink flushes buffered batch data upon receiving Barrier
Saved content:
public class SnapshotSplit { private final Object[] splitStart; // [1000] private final Object[] splitEnd; // [2000] private final Offset lowWatermark; // binlog.000011:1234 private final Offset highWatermark; // binlog.000011:5678 }
Restore logic:
Key code:
// IncrementalSourceReader.addSplits() for (SourceSplitBase split : splits) { if (split.isSnapshotSplit()) { SnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { finishedUnackedSplits.put(splitId, snapshotSplit); // completed, skip } else { unfinishedSplits.add(split); // not completed, read again } } }
Saved content:
public class IncrementalSplit { private final Offset startupOffset; // current Binlog position private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos; // backfill state private final Map<TableId, byte[]> historyTableChanges; // Debezium history }
Restore logic:
// IncrementalSourceReader.initializedState() if (split.isIncrementalSplit()) { IncrementalSplit incrementalSplit = split.asIncrementalSplit(); // restore table schema debeziumDeserializationSchema.restoreCheckpointProducedType( incrementalSplit.getCheckpointTables() ); // continue consuming from startupOffset return new IncrementalSplitState(incrementalSplit); }
| Phase | Saved Content | Restore Method | Duplicate Risk | |----|----|----|----| | Snapshot | Split range + Watermarks | Re-execute unfinished Splits | Yes (Sink must be idempotent; repeated Select may query different snapshot points) | | Incremental | Binlog Offset + Table Schema | Continue from Offset | No |
Thus, it is recommended to avoid restoring or pausing tasks during full snapshot and backfill phases, as many unknown issues may arise.
In practice, checkpoint (CK) timeout may occur even after 10–20 minutes. Why?
Analyzing CK and CDC tasks: long CK timeouts are usually due to insufficient write performance or misconfiguration on the Sink. Source only triggers CK to save minimal metadata quickly; the Sink must process all pending writes before the CK Barrier passes.
Checkpoint Timeout Mechanism Analysis
Checkpoint ensures Exactly-Once semantics; CK Barrier must propagate from source to sink, with all operators saving state.
Conclusion: prolonged timeout almost always means Sink cannot handle the backlog in time.
MySQL
# JDBC URL add batch rewrite parameter jdbc:mysql://host:port/db?rewriteBatchedStatements=true&cachePrepStmts=true
Doris/StarRocks
# Use stream load mode with tuning parameters sink { Doris { sink.enable-2pc = true sink.buffer-size = 1048576 sink.buffer-count = 3 } }
PostgreSQL
sink { Jdbc { # Use COPY mode instead of INSERT use_copy_statement = true } }
env { job.mode = STREAMING # Limit read speed to give Sink time read_limit.rows_per_second = 4000 read_limit.bytes_per_second = 7000000 # Increase checkpoint timeout checkpoint.interval = 30000 checkpoint.timeout = 600000 }
CDC technology is indeed complex, involving many aspects of distributed systems: parallelism control, state management, fault tolerance, exactly-once semantics, and a deep understanding of databases. SeaTunnel, building on Debezium, has implemented numerous engineering optimizations, fixed multiple bugs, and its architecture is friendly for newcomers. Whether fixing documentation or directly debugging code, it is relatively easy to get started.
We warmly welcome contributors to join the community!
This article aims to help you better understand SeaTunnel CDC’s internal mechanisms, reduce pitfalls in production, and improve tuning. Any questions or discovered errors are welcome for discussion and correction.
Finally, wishing all CDC tasks run stably without interruption, and checkpoints never time out again!
\


