Skip to content

Commit 4276681

Browse files
committed
[Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897)
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](#7897) 2、Added E2E test cases for this issue [(#7897)](#7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ```
1 parent 9088a0d commit 4276681

File tree

3 files changed

+14
-2
lines changed

3 files changed

+14
-2
lines changed

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
1919

20+
import lombok.extern.slf4j.Slf4j;
21+
import org.apache.seatunnel.api.source.Boundedness;
2022
import org.apache.seatunnel.api.source.Collector;
2123
import org.apache.seatunnel.api.source.SourceReader;
2224
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -34,7 +36,7 @@
3436
import java.util.Collections;
3537
import java.util.List;
3638
import java.util.Random;
37-
39+
@Slf4j
3840
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
3941

4042
private final List<ClickHouseNode> servers;
@@ -43,6 +45,7 @@ public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, Clickh
4345
private final SourceReader.Context readerContext;
4446
private ClickHouseRequest<?> request;
4547
private final String sql;
48+
private volatile boolean noMoreSplit;
4649

4750
private final List<ClickhouseSourceSplit> splits;
4851

@@ -97,6 +100,12 @@ record -> {
97100
}
98101
this.readerContext.signalNoMoreElement();
99102
this.splits.clear();
103+
} else if (noMoreSplit
104+
&& splits.isEmpty()
105+
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
106+
log.info("Closed the bounded ClickHouse source");
107+
this.readerContext.signalNoMoreElement();
108+
this.splits.clear();
100109
}
101110
}
102111

@@ -111,7 +120,7 @@ public void addSplits(List<ClickhouseSourceSplit> splits) {
111120
}
112121

113122
@Override
114-
public void handleNoMoreSplits() {}
123+
public void handleNoMoreSplits() {noMoreSplit = true;}
115124

116125
@Override
117126
public void notifyCheckpointComplete(long checkpointId) throws Exception {}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public void registerReader(int subtaskId) {
7777
if (assigned < 0) {
7878
assigned = subtaskId;
7979
context.assignSplit(subtaskId, new ClickhouseSourceSplit());
80+
} else {
81+
context.signalNoMoreSplits(subtaskId);
8082
}
8183
}
8284

seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ seatunnel:
2020
backup-count: 1
2121
queue-type: blockingqueue
2222
print-execution-info-interval: 60
23+
print-job-metrics-info-interval: 60
2324
slot-service:
2425
dynamic-slot: true
2526
checkpoint:

0 commit comments

Comments
 (0)