Skip to content

[Bug] [postgresql] split's message will lose when set debezium.slot.drop.on.stop = true and failure in streaming period #2431

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.17

Flink CDC version

2.5

Database and its version

debezium/postgres:9.6

Minimal reproduce step

As showed in c97e925 , add a test in PostgresSourceITCase:

@Test
    public void testDebeziumSlotDropOnStop() throws Exception {
        String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
        customDatabase.createAndInitialize();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        env.setParallelism(2);
        env.enableCheckpointing(200L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
        String sourceDDL =
                format(
                        "CREATE TABLE customers ("
                                + " id BIGINT NOT NULL,"
                                + " name STRING,"
                                + " address STRING,"
                                + " phone_number STRING,"
                                + " primary key (id) not enforced"
                                + ") WITH ("
                                + " 'connector' = 'postgres-cdc',"
                                + " 'scan.incremental.snapshot.enabled' = 'true',"
                                + " 'hostname' = '%s',"
                                + " 'port' = '%s',"
                                + " 'username' = '%s',"
                                + " 'password' = '%s',"
                                + " 'database-name' = '%s',"
                                + " 'schema-name' = '%s',"
                                + " 'table-name' = '%s',"
                                + " 'scan.startup.mode' = '%s',"
                                + " 'scan.incremental.snapshot.chunk.size' = '100',"
                                + " 'slot.name' = '%s', "
                                + " 'debezium.slot.drop.on.stop' = 'true'"
                                + ")",
                        customDatabase.getHost(),
                        customDatabase.getDatabasePort(),
                        customDatabase.getUsername(),
                        customDatabase.getPassword(),
                        customDatabase.getDatabaseName(),
                        SCHEMA_NAME,
                        "customers",
                        // todo: cherry-pick new latest-offset test latter
                        scanStartupMode,
                        getSlotName());
        tEnv.executeSql(sourceDDL);
        TableResult tableResult = tEnv.executeSql("select * from customers");

        // first step: check the snapshot data
        if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
            checkSnapshotData(
                    tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
        }

        // second step: check the stream data
        checkStreamDataWithDDLDuringFailover(
                tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});

        tableResult.getJobClient().get().cancel().get();
    }
 private void checkStreamDataWithDDLDuringFailover(
            TableResult tableResult,
            FailoverType failoverType,
            FailoverPhase failoverPhase,
            String[] captureCustomerTables)
            throws Exception {
        waitUntilJobRunning(tableResult);
        CloseableIterator<Row> iterator = tableResult.collect();
        JobID jobId = tableResult.getJobClient().get().getJobID();

        for (String tableId : captureCustomerTables) {
            makeFirstPartStreamEvents(
                    getConnection(),
                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
        }

        // wait for the stream reading
        Thread.sleep(2000L);

        if (failoverPhase == FailoverPhase.STREAM) {
            triggerFailover(
                    failoverType,
                    jobId,
                    miniClusterResource.getMiniCluster(),
                    () -> {
                        for (String tableId : captureCustomerTables) {
                            try {
                                makeSecondPartStreamEvents(
                                        getConnection(),
                                        customDatabase.getDatabaseName()
                                                + '.'
                                                + SCHEMA_NAME
                                                + '.'
                                                + tableId);
                            } catch (SQLException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        sleepMs(200);
                    });
            waitUntilJobRunning(tableResult);
        }

        List<String> expectedStreamData = new ArrayList<>();
        for (int i = 0; i < captureCustomerTables.length; i++) {
            expectedStreamData.addAll(firstPartStreamEvents);
            expectedStreamData.addAll(secondPartStreamEvents);
        }
        // wait for the stream reading
        Thread.sleep(2000L);

        assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
        assertTrue(!hasNextData(iterator));
    }

The test will timeout because of wal-log is be removed by debezium.

What did you expect to see?

Test success.

What did you see instead?

Test run

Anything else?

Reason

What's slot.drop.on.stop

slot.drop.on.stop is a param in Debezium. When slot.drop.on.stop = true, PostgresReplicationConnection will drop it's slot when PostgresReplicationConnection is closed.

Why Debezium Postgres connector is not recommended to set debezium.slot.drop.on.stop = true is that If it's stop and debezium.slot.drop.on.stop = true, the slot will be removed, So it has to restart the snapshot peried.

slot.drop.on.stop in Flink PG CDC

As I answer in #2378:

In snapshot period, we will read backfill streaming data for each Snapshot split. Many replication slot is created in this period.So after one Snapshot split is done, we will remove its replication slot. So it's forced to set slot.drop.on.stop = true(See PostgresSourceFetchTaskContext#configure)

The slot.drop.on.stop of Streaming spilt is default false. You can verify this by stop the flink job during streaming period. You can see the slot is still here. When the job created, the streaming split's slot(also called global slot) is created in PostgresSourceEnumerator. So the wal-log is never will be removed.

So it's better not use debezium.slot.drop.on.stop param, postgres cdc will do it.

Why problem occurs

slot.drop.on.stop of Streaming spilt is default false. However, if it is set to false, the data may be loss.
Streaming period will reuse global replication slot to avoid the Wal-Log too long. When failure occurs in this period, the PostgresReplicationConnection is closed , and then the slot will be droped. During the job restart period, the Wal-Log may be recycled.

Solution

Set slot.drop.on.stop of Streaming spilt is always false rather than based on user's setting.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions