Skip to content

Commit 1b80667

Browse files
committed
[Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897)
1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](#7897) 2、Added E2E test cases for this issue [(#7897)](#7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ```
1 parent 4276681 commit 1b80667

File tree

3 files changed

+57
-2
lines changed

3 files changed

+57
-2
lines changed

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
1919

20-
import lombok.extern.slf4j.Slf4j;
2120
import org.apache.seatunnel.api.source.Boundedness;
2221
import org.apache.seatunnel.api.source.Collector;
2322
import org.apache.seatunnel.api.source.SourceReader;
@@ -30,12 +29,14 @@
3029
import com.clickhouse.client.ClickHouseNode;
3130
import com.clickhouse.client.ClickHouseRequest;
3231
import com.clickhouse.client.ClickHouseResponse;
32+
import lombok.extern.slf4j.Slf4j;
3333

3434
import java.io.IOException;
3535
import java.util.ArrayList;
3636
import java.util.Collections;
3737
import java.util.List;
3838
import java.util.Random;
39+
3940
@Slf4j
4041
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
4142

@@ -120,7 +121,9 @@ public void addSplits(List<ClickhouseSourceSplit> splits) {
120121
}
121122

122123
@Override
123-
public void handleNoMoreSplits() {noMoreSplit = true;}
124+
public void handleNoMoreSplits() {
125+
noMoreSplit = true;
126+
}
124127

125128
@Override
126129
public void notifyCheckpointComplete(long checkpointId) throws Exception {}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ public void testClickhouse(TestContainer container) throws Exception {
101101
clearSinkTable();
102102
}
103103

104+
@TestTemplate
105+
public void testSourceParallelism(TestContainer container) throws Exception {
106+
System.out.println("=========多并行度测试===========");
107+
Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf");
108+
System.out.println(execResult.getExitCode());
109+
}
110+
104111
@BeforeAll
105112
@Override
106113
public void startUp() throws Exception {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 2
23+
job.mode = "BATCH"
24+
}
25+
26+
source {
27+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
28+
Clickhouse {
29+
host = "clickhouse:8123"
30+
database = "default"
31+
sql = "select * from source_table"
32+
username = "default"
33+
password = ""
34+
result_table_name = "source_table"
35+
}
36+
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
37+
# please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
38+
}
39+
40+
sink {
41+
console {
42+
}
43+
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
44+
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
45+
}

0 commit comments

Comments
 (0)