Skip to content

Commit 7172695

Browse files
authored
[cdc-base] Fix TM hangs caused by uncaught exception (#2511)
1 parent 07db169 commit 7172695

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId
7575
ThreadFactory threadFactory =
7676
new ThreadFactoryBuilder()
7777
.setNameFormat("debezium-snapshot-reader-" + subtaskId)
78+
.setUncaughtExceptionHandler(
79+
(thread, throwable) -> setReadException(throwable))
7880
.build();
7981
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
8082
this.hasNextElement = new AtomicBoolean(false);
@@ -89,17 +91,13 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
8991
this.queue = taskContext.getQueue();
9092
this.hasNextElement.set(true);
9193
this.reachEnd.set(false);
92-
executorService.submit(
94+
95+
executorService.execute(
9396
() -> {
9497
try {
9598
snapshotSplitReadTask.execute(taskContext);
9699
} catch (Exception e) {
97-
LOG.error(
98-
String.format(
99-
"Execute snapshot read task for snapshot split %s fail",
100-
currentSnapshotSplit),
101-
e);
102-
readException = e;
100+
setReadException(e);
103101
}
104102
});
105103
}
@@ -186,6 +184,19 @@ private void checkReadException() {
186184
}
187185
}
188186

187+
private void setReadException(Throwable throwable) {
188+
LOG.error(
189+
String.format(
190+
"Execute snapshot read task for snapshot split %s fail",
191+
currentSnapshotSplit),
192+
throwable);
193+
if (readException == null) {
194+
readException = throwable;
195+
} else {
196+
readException.addSuppressed(throwable);
197+
}
198+
}
199+
189200
@Override
190201
public void close() {
191202
try {

0 commit comments

Comments
 (0)