Skip to content

[Bug][mongodb] Mongo backfill may read data after high_watermark #2892

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

3.0

Flink CDC version

1.18

Database and its version

anyway

Minimal reproduce step

If new operation happens after high_watermark, but before backfill(or after backfill but not exceed heart interval)

public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;

    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

    @Test
    public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {

        List<String> records =
                testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK, false);
        List<String> expectedRecords =
                Arrays.asList(
                        "+I[101, user_1, Shanghai, 123567891234]",
                        "+I[102, user_2, Shanghai, 123567891234]",
                        "+I[103, user_3, Shanghai, 123567891234]",
                        "+I[109, user_4, Shanghai, 123567891234]",
                        "+I[110, user_5, Shanghai, 123567891234]",
                        "+I[111, user_6, Shanghai, 123567891234]",
                        "+I[118, user_7, Shanghai, 123567891234]",
                        "+I[121, user_8, Shanghai, 123567891234]",
                        "+I[123, user_9, Shanghai, 123567891234]",
                        "+I[1009, user_10, Shanghai, 123567891234]",
                        "+I[1010, user_11, Shanghai, 123567891234]",
                        "+I[1011, user_12, Shanghai, 123567891234]",
                        "+I[1012, user_13, Shanghai, 123567891234]",
                        "+I[1013, user_14, Shanghai, 123567891234]",
                        "+I[1014, user_15, Shanghai, 123567891234]",
                        "+I[1015, user_16, Shanghai, 123567891234]",
                        "+I[1016, user_17, Shanghai, 123567891234]",
                        "+I[1017, user_18, Shanghai, 123567891234]",
                        "+I[1018, user_19, Shanghai, 123567891234]",
                        "+I[1019, user_20, Shanghai, 123567891234]",
                        "+I[2000, user_21, Shanghai, 123567891234]",
                        // I[15213] will be consumed twice
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        "+I[15213, user_15213, Shanghai, 123567891234]",
                        "+U[2000, user_21, Pittsburgh, 123567891234]",
                        // delete message only contains _id, sql job contain value because of
                        // changelog normalization
                        "-D[0, null, null, null]");
        assertEqualsInAnyOrder(expectedRecords, records);   
}

 private List<String> testBackfillWhenWritingEvents(
            boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
            throws Exception {
        String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.setParallelism(1);

        ResolvedSchema customersSchame =
                new ResolvedSchema(
                        Arrays.asList(
                                physical("cid", BIGINT().notNull()),
                                physical("name", STRING()),
                                physical("address", STRING()),
                                physical("phone_number", STRING())),
                        new ArrayList<>(),
                        UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
        TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
        MongoDBSource source =
                new MongoDBSourceBuilder()
                        .hosts(CONTAINER.getHostAndPort())
                        .databaseList(customerDatabase)
                        .username(FLINK_USER)
                        .password(FLINK_USER_PASSWORD)
                        .startupOptions(StartupOptions.initial())
                        .scanFullChangelog(enableFullDocPrePostImage)
                        .collectionList(
                                getCollectionNameRegex(
                                        customerDatabase, new String[] {"customers"}))
                        .deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
                        .skipSnapshotBackfill(skipBackFill)
                        .build();

        // Do some database operations during hook in snapshot phase.
        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
        SnapshotPhaseHook snapshotPhaseHook =
                (sourceConfig, split) -> {
                    MongoDBSourceConfig mongoDBSourceConfig = (MongoDBSourceConfig) sourceConfig;
                    MongoClient mongoClient = MongoUtils.clientFor(mongoDBSourceConfig);
                    MongoDatabase database =
                            mongoClient.getDatabase(mongoDBSourceConfig.getDatabaseList().get(0));
                    MongoCollection<Document> mongoCollection = database.getCollection("customers");
                    Document document = new Document();
                    document.put("cid", 15213L);
                    document.put("name", "user_15213");
                    document.put("address", "Shanghai");
                    document.put("phone_number", "123567891234");
                    mongoCollection.insertOne(document);
                    mongoCollection.updateOne(
                            Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
                    mongoCollection.deleteOne(Filters.eq("cid", 1019L));
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                };

        switch (hookType) {
            case USE_POST_LOWWATERMARK_HOOK:
                hooks.setPostLowWatermarkAction(snapshotPhaseHook);
                break;
            case USE_PRE_HIGHWATERMARK_HOOK:
                hooks.setPreHighWatermarkAction(snapshotPhaseHook);
                break;
            case USE_POST_HIGHWATERMARK_HOOK:
                hooks.setPostHighWatermarkAction(snapshotPhaseHook);
                break;
        }
        source.setSnapshotHooks(hooks);

        List<String> records = new ArrayList<>();
        try (CloseableIterator<RowData> iterator =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
                        .executeAndCollect()) {
            records = fetchRowData(iterator, fetchSize, customerTable::stringify);
            env.close();
        }
        return records;
    }
    }

What did you expect to see?

"+I[15213, user_15213, Shanghai, 123567891234]" read exactly-once.

What did you see instead?

"+I[15213, user_15213, Shanghai, 123567891234]" read twice

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions