-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Search before asking
- I searched in the issues and found nothing similar.
Flink version
1.17.0
Flink CDC version
2.4.1
Database and its version
mongodb 4.2.11
Minimal reproduce step
- prepare a big collection with more than 10 billion records
- start a flink app to load data and sink to DiscardingSink with parallelism 125.
- at the beginning, the pull speed is high for the first hour or two, it decreases slowly to 0 in a few hours.
- when you look at the taskmanager's log, all task manager hang at "c.v.c.c.mongodb.source.reader.fetch.MongoDBScanFetchTask - Snapshot step 2 - Snapshotting data"
What did you expect to see?
snapshot all records in the collection, and then copying records from change stream.
What did you see instead?
task hang at snapshot phase.
Anything else?
here are two of the taskmanager's thread dump:
snapshot_hang.threaddump.txt
snapshot_hang_2.threaddump.txt
suspect theads:
"cluster-rtt-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=94 TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.mongodb.internal.connection.DefaultServerMonitor.waitForNext(DefaultServerMonitor.java:445)
at com.mongodb.internal.connection.DefaultServerMonitor.access$1500(DefaultServerMonitor.java:60)
at com.mongodb.internal.connection.DefaultServerMonitor$RoundTripTimeRunnable.run(DefaultServerMonitor.java:417)
at java.lang.Thread.run(Thread.java:748)
"cluster-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=93 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForSignalOrTimeout(DefaultServerMonitor.java:302)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForNext(DefaultServerMonitor.java:283)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:177)
at java.lang.Thread.run(Thread.java:748)
"System Time Trigger for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=92 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"Source Data Fetcher for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=91 TIMED_WAITING on io.debezium.connector.base.ChangeEventQueue@4b418b24
at java.lang.Object.wait(Native Method)
- waiting on io.debezium.connector.base.ChangeEventQueue@4b418b24
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:249)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:130)
at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@2dc71397
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working