Skip to content

Commit 6617e3f

Browse files
authored
Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)" (#14626)
This reverts commit e4fb5b3.
1 parent 930d2f0 commit 6617e3f

File tree

6 files changed

+162
-26
lines changed

6 files changed

+162
-26
lines changed

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,23 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.api.client;
1919

20+
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2021
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2122
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
2223
import org.apache.dolphinscheduler.spi.enums.DbType;
2324

2425
import org.apache.commons.lang3.StringUtils;
2526

2627
import java.sql.Connection;
27-
import java.sql.DriverManager;
2828
import java.sql.SQLException;
2929
import java.util.concurrent.TimeUnit;
3030

3131
import lombok.extern.slf4j.Slf4j;
3232

33+
import org.springframework.jdbc.core.JdbcTemplate;
34+
3335
import com.google.common.base.Stopwatch;
36+
import com.zaxxer.hikari.HikariDataSource;
3437

3538
@Slf4j
3639
public class CommonDataSourceClient implements DataSourceClient {
@@ -39,7 +42,8 @@ public class CommonDataSourceClient implements DataSourceClient {
3942
public static final String COMMON_VALIDATION_QUERY = "select 1";
4043

4144
protected final BaseConnectionParam baseConnectionParam;
42-
protected Connection connection;
45+
protected HikariDataSource dataSource;
46+
protected JdbcTemplate jdbcTemplate;
4347

4448
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
4549
this.baseConnectionParam = baseConnectionParam;
@@ -59,7 +63,8 @@ protected void checkEnv(BaseConnectionParam baseConnectionParam) {
5963
}
6064

6165
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
62-
this.connection = buildConn(baseConnectionParam);
66+
this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType);
67+
this.jdbcTemplate = new JdbcTemplate(dataSource);
6368
}
6469

6570
protected void checkUser(BaseConnectionParam baseConnectionParam) {
@@ -68,20 +73,6 @@ protected void checkUser(BaseConnectionParam baseConnectionParam) {
6873
}
6974
}
7075

71-
private Connection buildConn(BaseConnectionParam baseConnectionParam) {
72-
Connection conn = null;
73-
try {
74-
Class.forName(baseConnectionParam.getDriverClassName());
75-
conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(),
76-
baseConnectionParam.getPassword());
77-
} catch (ClassNotFoundException e) {
78-
throw new RuntimeException("Driver load fail", e);
79-
} catch (SQLException e) {
80-
throw new RuntimeException("JDBC connect failed", e);
81-
}
82-
return conn;
83-
}
84-
8576
protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) {
8677
baseConnectionParam.setUser(COMMON_USER);
8778
}
@@ -101,7 +92,7 @@ public void checkClient() {
10192
// Checking data source client
10293
Stopwatch stopwatch = Stopwatch.createStarted();
10394
try {
104-
this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
95+
this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());
10596
} catch (Exception e) {
10697
throw new RuntimeException("JDBC connect failed", e);
10798
} finally {
@@ -113,21 +104,20 @@ public void checkClient() {
113104
@Override
114105
public Connection getConnection() {
115106
try {
116-
return connection.isClosed() ? buildConn(baseConnectionParam) : connection;
107+
return this.dataSource.getConnection();
117108
} catch (SQLException e) {
118-
throw new RuntimeException("get conn is fail", e);
109+
log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e);
110+
return null;
119111
}
120112
}
121113

122114
@Override
123115
public void close() {
124-
log.info("do close connection {}.", baseConnectionParam.getDatabase());
125-
try {
126-
connection.close();
127-
} catch (SQLException e) {
128-
log.info("colse connection fail");
129-
throw new RuntimeException(e);
116+
log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
117+
try (HikariDataSource closedDatasource = dataSource) {
118+
// only close the resource
130119
}
120+
this.jdbcTemplate = null;
131121
}
132122

133123
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,14 @@
2525
import org.apache.dolphinscheduler.spi.enums.DbType;
2626

2727
import java.sql.Connection;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
30+
import java.util.concurrent.TimeUnit;
2831

2932
import lombok.extern.slf4j.Slf4j;
3033

34+
import com.google.common.base.Stopwatch;
35+
3136
@Slf4j
3237
public class AzureSQLDataSourceClient extends CommonDataSourceClient {
3338

@@ -44,4 +49,33 @@ public Connection getConnection() {
4449
return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
4550
}
4651

52+
@Override
53+
public void checkClient() {
54+
55+
AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam;
56+
Stopwatch stopwatch = Stopwatch.createStarted();
57+
String validationQuery = this.baseConnectionParam.getValidationQuery();
58+
if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
59+
// Checking data source client
60+
try {
61+
this.jdbcTemplate.execute(validationQuery);
62+
} catch (Exception e) {
63+
throw new RuntimeException("JDBC connect failed", e);
64+
} finally {
65+
log.info("Time to execute check jdbc client with sql {} for {} ms ",
66+
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
67+
}
68+
} else {
69+
try (Statement statement = getConnection().createStatement()) {
70+
if (!statement.execute(validationQuery)) {
71+
throw new SQLException("execute check azure sql token client failed : " + validationQuery);
72+
}
73+
} catch (SQLException e) {
74+
throw new RuntimeException(e);
75+
} finally {
76+
log.info("Time to execute check azure sql token client with sql {} for {} ms ",
77+
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
78+
}
79+
}
80+
}
4781
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
26+
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2627
import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
2728
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2829
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -32,9 +33,13 @@
3233
import org.apache.commons.lang3.StringUtils;
3334

3435
import java.lang.reflect.Field;
36+
import java.sql.Connection;
37+
import java.sql.SQLException;
3538

3639
import lombok.extern.slf4j.Slf4j;
3740

41+
import org.springframework.jdbc.core.JdbcTemplate;
42+
3843
@Slf4j
3944
public class HiveDataSourceClient extends CommonDataSourceClient {
4045

@@ -47,6 +52,17 @@ protected void preInit() {
4752
log.info("PreInit in {}", getClass().getName());
4853
}
4954

55+
@Override
56+
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
57+
log.info("Create UserGroupInformation.");
58+
UserGroupInformationFactory.login(baseConnectionParam.getUser());
59+
log.info("Create ugi success.");
60+
61+
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
62+
this.jdbcTemplate = new JdbcTemplate(dataSource);
63+
log.info("Init {} success.", getClass().getName());
64+
}
65+
5066
@Override
5167
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
5268
super.checkEnv(baseConnectionParam);
@@ -70,6 +86,20 @@ private void checkKerberosEnv() {
7086
}
7187
}
7288

89+
@Override
90+
public Connection getConnection() {
91+
Connection connection = null;
92+
while (connection == null) {
93+
try {
94+
connection = dataSource.getConnection();
95+
} catch (SQLException e) {
96+
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
97+
UserGroupInformationFactory.login(baseConnectionParam.getUser());
98+
}
99+
}
100+
return connection;
101+
}
102+
73103
@Override
74104
public void close() {
75105
try {

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
21+
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2122
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2223
import org.apache.dolphinscheduler.spi.enums.DbType;
2324

25+
import java.sql.Connection;
26+
import java.sql.SQLException;
27+
2428
import lombok.extern.slf4j.Slf4j;
2529

30+
import org.springframework.jdbc.core.JdbcTemplate;
31+
2632
@Slf4j
2733
public class KyuubiDataSourceClient extends CommonDataSourceClient {
2834

@@ -35,11 +41,32 @@ protected void preInit() {
3541
log.info("PreInit in {}", getClass().getName());
3642
}
3743

44+
@Override
45+
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
46+
47+
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
48+
this.jdbcTemplate = new JdbcTemplate(dataSource);
49+
log.info("Init {} success.", getClass().getName());
50+
}
51+
3852
@Override
3953
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
4054
super.checkEnv(baseConnectionParam);
4155
}
4256

57+
@Override
58+
public Connection getConnection() {
59+
Connection connection = null;
60+
while (connection == null) {
61+
try {
62+
connection = dataSource.getConnection();
63+
} catch (SQLException e) {
64+
log.error("Failed to get Kyuubi Connection.", e);
65+
}
66+
}
67+
return connection;
68+
}
69+
4370
@Override
4471
public void close() {
4572
super.close();

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
21+
import org.apache.dolphinscheduler.spi.enums.DbType;
2122

2223
import java.sql.Connection;
2324

@@ -48,6 +49,13 @@ public void testCheckEnv() {
4849
Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
4950
}
5051

52+
@Test
53+
public void testInitClient() {
54+
KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam();
55+
kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI);
56+
Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI);
57+
}
58+
5159
@Test
5260
public void testCheckClient() {
5361
kyuubiDataSourceClient.checkClient();

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,63 @@
1818
package org.apache.dolphinscheduler.plugin.datasource.redshift;
1919

2020
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
21+
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
22+
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
23+
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
2124
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2225
import org.apache.dolphinscheduler.spi.enums.DbType;
2326

27+
import java.sql.Connection;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
30+
import java.util.concurrent.TimeUnit;
31+
2432
import lombok.extern.slf4j.Slf4j;
2533

34+
import com.google.common.base.Stopwatch;
35+
2636
@Slf4j
2737
public class RedshiftDataSourceClient extends CommonDataSourceClient {
2838

2939
public RedshiftDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
3040
super(baseConnectionParam, dbType);
3141
}
3242

43+
@Override
44+
public Connection getConnection() {
45+
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
46+
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
47+
return super.getConnection();
48+
}
49+
return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
50+
}
51+
52+
@Override
53+
public void checkClient() {
54+
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
55+
Stopwatch stopwatch = Stopwatch.createStarted();
56+
String validationQuery = this.baseConnectionParam.getValidationQuery();
57+
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
58+
// Checking data source client
59+
try {
60+
this.jdbcTemplate.execute(validationQuery);
61+
} catch (Exception e) {
62+
throw new RuntimeException("JDBC connect failed", e);
63+
} finally {
64+
log.info("Time to execute check jdbc client with sql {} for {} ms ",
65+
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
66+
}
67+
} else {
68+
try (Statement statement = getConnection().createStatement()) {
69+
if (!statement.execute(validationQuery)) {
70+
throw new SQLException("execute check redshift access key failed : " + validationQuery);
71+
}
72+
} catch (SQLException e) {
73+
throw new RuntimeException(e);
74+
} finally {
75+
log.info("Time to execute check redshift access key with sql {} for {} ms ",
76+
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
77+
}
78+
}
79+
}
3380
}

0 commit comments

Comments
 (0)