Skip to content

[Bug][sqlserver] SqlServer incremental source cannot support exactly-once  #2853

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.18

Flink CDC version

3.0

Reason

overview

At first, we can see what SqlServer incremental source do to guarantee exactly-once sematics in parallel read。
First, split the table into multiple chunks based on the key, each chunk as a snapshot split. These splits can be read in parallel.

In snapshot phase, for each snapshot split between [low_key, high_key]:

  1. use SqlServerDialect#displayCurrentOffset to get LSN as low_watermark
  2. read snapshot data between [low_key, high_key] as a temporary state state1 by JDBC connection query.
  3. use SqlServerDialect#displayCurrentOffset to get LSN as high_watermark
  4. read log between (low_watermark, high_watermark), will update temporary state state1 and generate final state of high_watermark, then emit to downstream.

Then in stream phase, we read log between [high_watermark, +∞)for this split between [low_key, high_key].

problem

However, SqlServerDialect#displayCurrentOffset → SqlServerUtils#currentLsn → SqlServerConnection#getMaxTransactionLsn return the max LSN of system table cdc.lsn_time_mapping , which is not the real latest LSN of whole database system. In this incremental source framework, only the real latest LSN of whole database system can guarantee exactly-once sematics.

As shown in https://stackoverflow.com/questions/29477391/cdc-data-only-shows-up-after-5-minutes, this developer find that CDC Data Only Shows up After 5 Minutes , and the reason is :

Because the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table.

For example, low_mark and high_watermark maybe 5 minutes lower than the Lsn of step two(read snapshot data). Then in streaming phase, the log which should be ignored will be read again.

How to verify

Modify test: com.ververica.cdc.connectors.oracle.source.OracleSourceITCase#testEnableBackfillWithDMLPostLowWaterMark, we can see three dml operations are read twicely.

  @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {

        List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK);

        List<String> expectedRecords =
                Arrays.asList(
                        "+I[101, user_1, Shanghai, 123567891234]",
                        "+I[102, user_2, Shanghai, 123567891234]",
                        "+I[103, user_3, Shanghai, 123567891234]",
                        "+I[109, user_4, Shanghai, 123567891234]",
                        "+I[110, user_5, Shanghai, 123567891234]",
                        "+I[111, user_6, Shanghai, 123567891234]",
                        "+I[118, user_7, Shanghai, 123567891234]",
                        "+I[121, user_8, Shanghai, 123567891234]",
                        "+I[123, user_9, Shanghai, 123567891234]",
                        "+I[1009, user_10, Shanghai, 123567891234]",
                        "+I[1010, user_11, Shanghai, 123567891234]",
                        "+I[1011, user_12, Shanghai, 123567891234]",
                        "+I[1012, user_13, Shanghai, 123567891234]",
                        "+I[1013, user_14, Shanghai, 123567891234]",
                        "+I[1014, user_15, Shanghai, 123567891234]",
                        "+I[1015, user_16, Shanghai, 123567891234]",
                        "+I[1016, user_17, Shanghai, 123567891234]",
                        "+I[1017, user_18, Shanghai, 123567891234]",
                        "+I[1018, user_19, Shanghai, 123567891234]",
                        "+I[2000, user_21, Pittsburgh, 123567891234]",
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        // the operations  already be applied in snapshot phase, but are read again in streaming phase
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        "-U[2000, user_21, Shanghai, 123567891234]",
                        "+U[2000, user_21, Pittsburgh, 123567891234]",
                        "-D[1019, user_20, Shanghai, 123567891234]");
        assertEqualsInAnyOrder(expectedRecords, records);
    }
 private List<String> testBackfillWhenWritingEvents(
            boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
        createAndInitialize("customer.sql");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(200L);
        env.setParallelism(1);

        ResolvedSchema customersSchema =
                new ResolvedSchema(
                        Arrays.asList(
                                physical("ID", BIGINT().notNull()),
                                physical("NAME", STRING()),
                                physical("ADDRESS", STRING()),
                                physical("PHONE_NUMBER", STRING())),
                        new ArrayList<>(),
                        UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
        TestTable customerTable =
                new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
        String tableId = customerTable.getTableId();

        OracleSourceBuilder.OracleIncrementalSource source =
                OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
                        .hostname(ORACLE_CONTAINER.getHost())
                        .port(ORACLE_CONTAINER.getOraclePort())
                        .username(CONNECTOR_USER)
                        .password(CONNECTOR_PWD)
                        .databaseList(ORACLE_DATABASE)
                        .schemaList(ORACLE_SCHEMA)
                        .tableList("DEBEZIUM.CUSTOMERS")
                        .skipSnapshotBackfill(skipSnapshotBackfill)
                        .startupOptions(StartupOptions.initial())
                        .deserializer(customerTable.getDeserializer())
                        .build();

        // Do some database operations during hook in snapshot period.
        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
        String[] statements =
                new String[] {
                    String.format(
                            "INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')",
                            tableId),
                    String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId),
                    String.format("DELETE FROM %s WHERE id=1019", tableId)
                };
        SnapshotPhaseHook snapshotPhaseHook =
                (sourceConfig, split) -> {
                    // database update operations use TEST_USER rather than CONNECTOR_USER
                    JdbcConfiguration configuration =
                            JdbcConfiguration.copy(
                                            ((JdbcSourceConfig) sourceConfig)
                                                    .getDbzConnectorConfig()
                                                    .getJdbcConfig())
                                    .withUser("debezium")
                                    .withPassword("dbz")
                                    .build();
                    try (OracleConnection oracleConnection =
                            OracleConnectionUtils.createOracleConnection(configuration)) {
                        oracleConnection.setAutoCommit(false);
                        oracleConnection.execute(statements);
                        oracleConnection.commit();
                    }
                };

        if (hookType == USE_POST_LOWWATERMARK_HOOK) {
            hooks.setPostLowWatermarkAction(snapshotPhaseHook);
        } else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
            hooks.setPreHighWatermarkAction(snapshotPhaseHook);
        }
        source.setSnapshotHooks(hooks);

        List<String> records = new ArrayList<>();
        try (CloseableIterator<RowData> iterator =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
                        .executeAndCollect()) {
            records = fetchRowData(iterator, fetchSize, customerTable::stringify);
            env.close();
        }
        return records;
    }

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions