Skip to content

Flink sink writes duplicate data in upsert mode #10431

@zhongqishang

Description

@zhongqishang

Apache Iceberg version

1.2.1

Query engine

Flink 1.14.4

Please describe the bug 🐞

I have a flink upsert job with a checkpoint interval of 5 minutes and an external service periodically(30min) triggers the savepoint, parallelism = 1.

5 files were generated in one checkpoint cycle, including two data files, two eq delete files, and one pos delete file.
The 2 data files and 2 eq-delete files contained the same data. When I queried, duplicate data appeared.
I think it is because the subsequent eq delete is not associated with the first data file.

Flink TM log

2024-05-31 16:10:57.457 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.459 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.462 org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Checkpoint 5765 has been notified as aborted, would not trigger any checkpoint.
2024-05-31 16:13:58.455 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.505 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.507 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]

JM log

2024-05-31 16:08:12.840 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5764 (type=CHECKPOINT) @ 1717142891998 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.239 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering savepoint for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5765 (type=SAVEPOINT) @ 1717142896239 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:09:41.531 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 5764 for job fc721024df3d70e3a1f3a46a63e9635a (7170 bytes, checkpointDuration=89495 ms, finalizationTime=38 ms).
2024-05-31 16:09:41.532 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 5764 as completed for source Source: TableSourceScan(table=[[default_catalog, default_database, cdc_xxx]], fields=[id, data_status, ...]).
2024-05-31 16:10:46.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 5765 of job fc721024df3d70e3a1f3a46a63e9635a expired before completing.

Downloaded files and sizes:

-rw-r--r--@ 1 q  staff   30528 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet
-rw-r--r--@ 1 q  staff     701 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet
-rw-r--r--@ 1 q  staff  741706 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet
-rw-r--r--@ 1 q  staff   17592 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet
-rw-r--r--@ 1 q  staff    1978 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingflink

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions