-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Search before asking
- I searched in the issues and found nothing similar.
Flink version
3.0
Flink CDC version
1.18
Database and its version
anyway
Minimal reproduce step
If new operation happens after high_watermark, but before backfill(or after backfill but not exceed heart interval)
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
@Test
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK, false);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
// I[15213] will be consumed twice
"+I[15213, user_15213, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
// delete message only contains _id, sql job contain value because of
// changelog normalization
"-D[0, null, null, null]");
assertEqualsInAnyOrder(expectedRecords, records);
}
private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
throws Exception {
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
ResolvedSchema customersSchame =
new ResolvedSchema(
Arrays.asList(
physical("cid", BIGINT().notNull()),
physical("name", STRING()),
physical("address", STRING()),
physical("phone_number", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(enableFullDocPrePostImage)
.collectionList(
getCollectionNameRegex(
customerDatabase, new String[] {"customers"}))
.deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
.skipSnapshotBackfill(skipBackFill)
.build();
// Do some database operations during hook in snapshot phase.
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
SnapshotPhaseHook snapshotPhaseHook =
(sourceConfig, split) -> {
MongoDBSourceConfig mongoDBSourceConfig = (MongoDBSourceConfig) sourceConfig;
MongoClient mongoClient = MongoUtils.clientFor(mongoDBSourceConfig);
MongoDatabase database =
mongoClient.getDatabase(mongoDBSourceConfig.getDatabaseList().get(0));
MongoCollection<Document> mongoCollection = database.getCollection("customers");
Document document = new Document();
document.put("cid", 15213L);
document.put("name", "user_15213");
document.put("address", "Shanghai");
document.put("phone_number", "123567891234");
mongoCollection.insertOne(document);
mongoCollection.updateOne(
Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);
List<String> records = new ArrayList<>();
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
}
}What did you expect to see?
"+I[15213, user_15213, Shanghai, 123567891234]" read exactly-once.
What did you see instead?
"+I[15213, user_15213, Shanghai, 123567891234]" read twice
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working