Skip to content

[Bug] [mongodb] task managers hang forever one by one #2510

@vanliu-tx

Description

@vanliu-tx

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.17.0

Flink CDC version

2.4.1

Database and its version

mongodb 4.2.11

Minimal reproduce step

  1. prepare a big collection with more than 10 billion records
  2. start a flink app to load data and sink to DiscardingSink with parallelism 125.
  3. at the beginning, the pull speed is high for the first hour or two, it decreases slowly to 0 in a few hours.
  4. when you look at the taskmanager's log, all task manager hang at "c.v.c.c.mongodb.source.reader.fetch.MongoDBScanFetchTask - Snapshot step 2 - Snapshotting data"

What did you expect to see?

snapshot all records in the collection, and then copying records from change stream.

What did you see instead?

task hang at snapshot phase.

Anything else?

here are two of the taskmanager's thread dump:
snapshot_hang.threaddump.txt
snapshot_hang_2.threaddump.txt

suspect theads:

"cluster-rtt-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=94 TIMED_WAITING
	at java.lang.Thread.sleep(Native Method)
	at com.mongodb.internal.connection.DefaultServerMonitor.waitForNext(DefaultServerMonitor.java:445)
	at com.mongodb.internal.connection.DefaultServerMonitor.access$1500(DefaultServerMonitor.java:60)
	at com.mongodb.internal.connection.DefaultServerMonitor$RoundTripTimeRunnable.run(DefaultServerMonitor.java:417)
	at java.lang.Thread.run(Thread.java:748)

"cluster-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=93 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForSignalOrTimeout(DefaultServerMonitor.java:302)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForNext(DefaultServerMonitor.java:283)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:177)
	at java.lang.Thread.run(Thread.java:748)

"System Time Trigger for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=92 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"Source Data Fetcher for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=91 TIMED_WAITING on io.debezium.connector.base.ChangeEventQueue@4b418b24
	at java.lang.Object.wait(Native Method)
	-  waiting on io.debezium.connector.base.ChangeEventQueue@4b418b24
	at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:249)
	at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:130)
	at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

	Number of locked synchronizers = 1
	- java.util.concurrent.ThreadPoolExecutor$Worker@2dc71397

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions