Skip to content

Commit e4fb5b3

Browse files
xuhui1231xuhuizhongjiajie
authored
datasource test and sql task Remove connection pool issues is #14179 (#14193)
* datasource test and sql task Remove connection pool issues is #14179 * datasource test and sql task Remove connection pool issues is #14179 uniform style * datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720 * datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720 --------- Co-authored-by: xuhui <[email protected]> Co-authored-by: Jay Chung <[email protected]>
1 parent d92e2b3 commit e4fb5b3

File tree

6 files changed

+26
-162
lines changed

6 files changed

+26
-162
lines changed

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,20 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.api.client;
1919

20-
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2120
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2221
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
2322
import org.apache.dolphinscheduler.spi.enums.DbType;
2423

2524
import org.apache.commons.lang3.StringUtils;
2625

2726
import java.sql.Connection;
27+
import java.sql.DriverManager;
2828
import java.sql.SQLException;
2929
import java.util.concurrent.TimeUnit;
3030

3131
import lombok.extern.slf4j.Slf4j;
3232

33-
import org.springframework.jdbc.core.JdbcTemplate;
34-
3533
import com.google.common.base.Stopwatch;
36-
import com.zaxxer.hikari.HikariDataSource;
3734

3835
@Slf4j
3936
public class CommonDataSourceClient implements DataSourceClient {
@@ -42,8 +39,7 @@ public class CommonDataSourceClient implements DataSourceClient {
4239
public static final String COMMON_VALIDATION_QUERY = "select 1";
4340

4441
protected final BaseConnectionParam baseConnectionParam;
45-
protected HikariDataSource dataSource;
46-
protected JdbcTemplate jdbcTemplate;
42+
protected Connection connection;
4743

4844
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
4945
this.baseConnectionParam = baseConnectionParam;
@@ -63,8 +59,7 @@ protected void checkEnv(BaseConnectionParam baseConnectionParam) {
6359
}
6460

6561
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
66-
this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType);
67-
this.jdbcTemplate = new JdbcTemplate(dataSource);
62+
this.connection = buildConn(baseConnectionParam);
6863
}
6964

7065
protected void checkUser(BaseConnectionParam baseConnectionParam) {
@@ -73,6 +68,20 @@ protected void checkUser(BaseConnectionParam baseConnectionParam) {
7368
}
7469
}
7570

71+
private Connection buildConn(BaseConnectionParam baseConnectionParam) {
72+
Connection conn = null;
73+
try {
74+
Class.forName(baseConnectionParam.getDriverClassName());
75+
conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(),
76+
baseConnectionParam.getPassword());
77+
} catch (ClassNotFoundException e) {
78+
throw new RuntimeException("Driver load fail", e);
79+
} catch (SQLException e) {
80+
throw new RuntimeException("JDBC connect failed", e);
81+
}
82+
return conn;
83+
}
84+
7685
protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) {
7786
baseConnectionParam.setUser(COMMON_USER);
7887
}
@@ -92,7 +101,7 @@ public void checkClient() {
92101
// Checking data source client
93102
Stopwatch stopwatch = Stopwatch.createStarted();
94103
try {
95-
this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());
104+
this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
96105
} catch (Exception e) {
97106
throw new RuntimeException("JDBC connect failed", e);
98107
} finally {
@@ -104,20 +113,21 @@ public void checkClient() {
104113
@Override
105114
public Connection getConnection() {
106115
try {
107-
return this.dataSource.getConnection();
116+
return connection.isClosed() ? buildConn(baseConnectionParam) : connection;
108117
} catch (SQLException e) {
109-
log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e);
110-
return null;
118+
throw new RuntimeException("get conn is fail", e);
111119
}
112120
}
113121

114122
@Override
115123
public void close() {
116-
log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
117-
try (HikariDataSource closedDatasource = dataSource) {
118-
// only close the resource
124+
log.info("do close connection {}.", baseConnectionParam.getDatabase());
125+
try {
126+
connection.close();
127+
} catch (SQLException e) {
128+
log.info("colse connection fail");
129+
throw new RuntimeException(e);
119130
}
120-
this.jdbcTemplate = null;
121131
}
122132

123133
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,9 @@
2525
import org.apache.dolphinscheduler.spi.enums.DbType;
2626

2727
import java.sql.Connection;
28-
import java.sql.SQLException;
29-
import java.sql.Statement;
30-
import java.util.concurrent.TimeUnit;
3128

3229
import lombok.extern.slf4j.Slf4j;
3330

34-
import com.google.common.base.Stopwatch;
35-
3631
@Slf4j
3732
public class AzureSQLDataSourceClient extends CommonDataSourceClient {
3833

@@ -49,33 +44,4 @@ public Connection getConnection() {
4944
return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
5045
}
5146

52-
@Override
53-
public void checkClient() {
54-
55-
AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam;
56-
Stopwatch stopwatch = Stopwatch.createStarted();
57-
String validationQuery = this.baseConnectionParam.getValidationQuery();
58-
if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
59-
// Checking data source client
60-
try {
61-
this.jdbcTemplate.execute(validationQuery);
62-
} catch (Exception e) {
63-
throw new RuntimeException("JDBC connect failed", e);
64-
} finally {
65-
log.info("Time to execute check jdbc client with sql {} for {} ms ",
66-
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
67-
}
68-
} else {
69-
try (Statement statement = getConnection().createStatement()) {
70-
if (!statement.execute(validationQuery)) {
71-
throw new SQLException("execute check azure sql token client failed : " + validationQuery);
72-
}
73-
} catch (SQLException e) {
74-
throw new RuntimeException(e);
75-
} finally {
76-
log.info("Time to execute check azure sql token client with sql {} for {} ms ",
77-
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
78-
}
79-
}
80-
}
8147
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
26-
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2726
import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
2827
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2928
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -33,13 +32,9 @@
3332
import org.apache.commons.lang3.StringUtils;
3433

3534
import java.lang.reflect.Field;
36-
import java.sql.Connection;
37-
import java.sql.SQLException;
3835

3936
import lombok.extern.slf4j.Slf4j;
4037

41-
import org.springframework.jdbc.core.JdbcTemplate;
42-
4338
@Slf4j
4439
public class HiveDataSourceClient extends CommonDataSourceClient {
4540

@@ -52,17 +47,6 @@ protected void preInit() {
5247
log.info("PreInit in {}", getClass().getName());
5348
}
5449

55-
@Override
56-
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
57-
log.info("Create UserGroupInformation.");
58-
UserGroupInformationFactory.login(baseConnectionParam.getUser());
59-
log.info("Create ugi success.");
60-
61-
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
62-
this.jdbcTemplate = new JdbcTemplate(dataSource);
63-
log.info("Init {} success.", getClass().getName());
64-
}
65-
6650
@Override
6751
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
6852
super.checkEnv(baseConnectionParam);
@@ -86,20 +70,6 @@ private void checkKerberosEnv() {
8670
}
8771
}
8872

89-
@Override
90-
public Connection getConnection() {
91-
Connection connection = null;
92-
while (connection == null) {
93-
try {
94-
connection = dataSource.getConnection();
95-
} catch (SQLException e) {
96-
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
97-
UserGroupInformationFactory.login(baseConnectionParam.getUser());
98-
}
99-
}
100-
return connection;
101-
}
102-
10373
@Override
10474
public void close() {
10575
try {

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,11 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
21-
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2221
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2322
import org.apache.dolphinscheduler.spi.enums.DbType;
2423

25-
import java.sql.Connection;
26-
import java.sql.SQLException;
27-
2824
import lombok.extern.slf4j.Slf4j;
2925

30-
import org.springframework.jdbc.core.JdbcTemplate;
31-
3226
@Slf4j
3327
public class KyuubiDataSourceClient extends CommonDataSourceClient {
3428

@@ -41,32 +35,11 @@ protected void preInit() {
4135
log.info("PreInit in {}", getClass().getName());
4236
}
4337

44-
@Override
45-
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
46-
47-
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
48-
this.jdbcTemplate = new JdbcTemplate(dataSource);
49-
log.info("Init {} success.", getClass().getName());
50-
}
51-
5238
@Override
5339
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
5440
super.checkEnv(baseConnectionParam);
5541
}
5642

57-
@Override
58-
public Connection getConnection() {
59-
Connection connection = null;
60-
while (connection == null) {
61-
try {
62-
connection = dataSource.getConnection();
63-
} catch (SQLException e) {
64-
log.error("Failed to get Kyuubi Connection.", e);
65-
}
66-
}
67-
return connection;
68-
}
69-
7043
@Override
7144
public void close() {
7245
super.close();

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
21-
import org.apache.dolphinscheduler.spi.enums.DbType;
2221

2322
import java.sql.Connection;
2423

@@ -49,13 +48,6 @@ public void testCheckEnv() {
4948
Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
5049
}
5150

52-
@Test
53-
public void testInitClient() {
54-
KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam();
55-
kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI);
56-
Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI);
57-
}
58-
5951
@Test
6052
public void testCheckClient() {
6153
kyuubiDataSourceClient.checkClient();

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,63 +18,16 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.redshift;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
21-
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
22-
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
23-
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
2421
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2522
import org.apache.dolphinscheduler.spi.enums.DbType;
2623

27-
import java.sql.Connection;
28-
import java.sql.SQLException;
29-
import java.sql.Statement;
30-
import java.util.concurrent.TimeUnit;
31-
3224
import lombok.extern.slf4j.Slf4j;
3325

34-
import com.google.common.base.Stopwatch;
35-
3626
@Slf4j
3727
public class RedshiftDataSourceClient extends CommonDataSourceClient {
3828

3929
public RedshiftDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
4030
super(baseConnectionParam, dbType);
4131
}
4232

43-
@Override
44-
public Connection getConnection() {
45-
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
46-
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
47-
return super.getConnection();
48-
}
49-
return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
50-
}
51-
52-
@Override
53-
public void checkClient() {
54-
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
55-
Stopwatch stopwatch = Stopwatch.createStarted();
56-
String validationQuery = this.baseConnectionParam.getValidationQuery();
57-
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
58-
// Checking data source client
59-
try {
60-
this.jdbcTemplate.execute(validationQuery);
61-
} catch (Exception e) {
62-
throw new RuntimeException("JDBC connect failed", e);
63-
} finally {
64-
log.info("Time to execute check jdbc client with sql {} for {} ms ",
65-
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
66-
}
67-
} else {
68-
try (Statement statement = getConnection().createStatement()) {
69-
if (!statement.execute(validationQuery)) {
70-
throw new SQLException("execute check redshift access key failed : " + validationQuery);
71-
}
72-
} catch (SQLException e) {
73-
throw new RuntimeException(e);
74-
} finally {
75-
log.info("Time to execute check redshift access key with sql {} for {} ms ",
76-
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
77-
}
78-
}
79-
}
8033
}

0 commit comments

Comments
 (0)