Skip to content

Commit 9cc70fe

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents 45af87a + 6ef9b23 commit 9cc70fe

File tree

4 files changed

+338
-30
lines changed

4 files changed

+338
-30
lines changed

jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
141141
private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
142142
private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows";
143143

144-
private final HashMap<String, Properties> basePropretiesMap;
144+
private static final Set<String> PRESTO_PROPERTIES = new HashSet<>(Arrays.asList(
145+
"user", "password",
146+
"socksProxy", "httpProxy", "clientTags", "applicationNamePrefix", "accessToken",
147+
"SSL", "SSLKeyStorePath", "SSLKeyStorePassword", "SSLTrustStorePath",
148+
"SSLTrustStorePassword", "KerberosRemoteServiceName", "KerberosPrincipal",
149+
"KerberosUseCanonicalHostname", "KerberosServicePrincipalPattern",
150+
"KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath",
151+
"extraCredentials", "roles", "sessionProperties"));
152+
153+
private final HashMap<String, Properties> basePropertiesMap;
145154
private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
146155
private final HashMap<String, SqlCompleter> sqlCompletersMap;
147156

@@ -153,7 +162,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
153162
public JDBCInterpreter(Properties property) {
154163
super(property);
155164
jdbcUserConfigurationsMap = new HashMap<>();
156-
basePropretiesMap = new HashMap<>();
165+
basePropertiesMap = new HashMap<>();
157166
sqlCompletersMap = new HashMap<>();
158167
maxLineResults = MAX_LINE_DEFAULT;
159168
}
@@ -180,7 +189,7 @@ protected boolean runKerberosLogin() {
180189
}
181190

182191
public HashMap<String, Properties> getPropertiesMap() {
183-
return basePropretiesMap;
192+
return basePropertiesMap;
184193
}
185194

186195
@Override
@@ -193,20 +202,20 @@ public void open() {
193202
logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
194203

195204
Properties prefixProperties;
196-
if (basePropretiesMap.containsKey(keyValue[0])) {
197-
prefixProperties = basePropretiesMap.get(keyValue[0]);
205+
if (basePropertiesMap.containsKey(keyValue[0])) {
206+
prefixProperties = basePropertiesMap.get(keyValue[0]);
198207
} else {
199208
prefixProperties = new Properties();
200-
basePropretiesMap.put(keyValue[0].trim(), prefixProperties);
209+
basePropertiesMap.put(keyValue[0].trim(), prefixProperties);
201210
}
202211
prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey));
203212
}
204213
}
205214

206215
Set<String> removeKeySet = new HashSet<>();
207-
for (String key : basePropretiesMap.keySet()) {
216+
for (String key : basePropertiesMap.keySet()) {
208217
if (!COMMON_KEY.equals(key)) {
209-
Properties properties = basePropretiesMap.get(key);
218+
Properties properties = basePropertiesMap.get(key);
210219
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
211220
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
212221
key, DRIVER_KEY, key, key, URL_KEY);
@@ -216,9 +225,9 @@ public void open() {
216225
}
217226

218227
for (String key : removeKeySet) {
219-
basePropretiesMap.remove(key);
228+
basePropertiesMap.remove(key);
220229
}
221-
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
230+
logger.debug("JDBC PropretiesMap: {}", basePropertiesMap);
222231

223232
setMaxLineResults();
224233
setMaxRows();
@@ -238,9 +247,9 @@ protected boolean isKerboseEnabled() {
238247
}
239248

240249
private void setMaxLineResults() {
241-
if (basePropretiesMap.containsKey(COMMON_KEY) &&
242-
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
243-
maxLineResults = Integer.valueOf(basePropretiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY));
250+
if (basePropertiesMap.containsKey(COMMON_KEY) &&
251+
basePropertiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
252+
maxLineResults = Integer.valueOf(basePropertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY));
244253
}
245254
}
246255

@@ -346,9 +355,9 @@ private String getJDBCDriverName(String user, String propertyKey) {
346355
}
347356

348357
private boolean existAccountInBaseProperty(String propertyKey) {
349-
return basePropretiesMap.get(propertyKey).containsKey(USER_KEY) &&
350-
!isEmpty((String) basePropretiesMap.get(propertyKey).get(USER_KEY)) &&
351-
basePropretiesMap.get(propertyKey).containsKey(PASSWORD_KEY);
358+
return basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
359+
!isEmpty((String) basePropertiesMap.get(propertyKey).get(USER_KEY)) &&
360+
basePropertiesMap.get(propertyKey).containsKey(PASSWORD_KEY);
352361
}
353362

354363
private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
@@ -384,14 +393,14 @@ private void setUserProperty(String propertyKey, InterpreterContext interpreterC
384393
String user = interpreterContext.getAuthenticationInfo().getUser();
385394

386395
JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
387-
if (basePropretiesMap.get(propertyKey).containsKey(USER_KEY) &&
388-
!basePropretiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
389-
String password = getPassword(basePropretiesMap.get(propertyKey));
396+
if (basePropertiesMap.get(propertyKey).containsKey(USER_KEY) &&
397+
!basePropertiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) {
398+
String password = getPassword(basePropertiesMap.get(propertyKey));
390399
if (!isEmpty(password)) {
391-
basePropretiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
400+
basePropertiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password);
392401
}
393402
}
394-
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropretiesMap.get(propertyKey));
403+
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropertiesMap.get(propertyKey));
395404
if (existAccountInBaseProperty(propertyKey)) {
396405
return;
397406
}
@@ -407,7 +416,19 @@ private void setUserProperty(String propertyKey, InterpreterContext interpreterC
407416
}
408417

409418
private void createConnectionPool(String url, String user, String propertyKey,
410-
Properties properties) throws SQLException, ClassNotFoundException {
419+
Properties properties) throws SQLException, ClassNotFoundException, IOException {
420+
421+
String driverClass = properties.getProperty(DRIVER_KEY);
422+
if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver")
423+
|| driverClass.equals("io.prestosql.jdbc.PrestoDriver"))) {
424+
// Only add valid properties otherwise presto won't work.
425+
for (Object key : properties.keySet()) {
426+
if (!PRESTO_PROPERTIES.contains(key.toString())) {
427+
properties.remove(key);
428+
}
429+
}
430+
}
431+
411432
ConnectionFactory connectionFactory =
412433
new DriverManagerConnectionFactory(url, properties);
413434

@@ -420,14 +441,14 @@ private void createConnectionPool(String url, String user, String propertyKey,
420441
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
421442

422443
poolableConnectionFactory.setPool(connectionPool);
423-
Class.forName(properties.getProperty(DRIVER_KEY));
444+
Class.forName(driverClass);
424445
PoolingDriver driver = new PoolingDriver();
425446
driver.registerPool(propertyKey + user, connectionPool);
426447
getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver);
427448
}
428449

429450
private Connection getConnectionFromPool(String url, String user, String propertyKey,
430-
Properties properties) throws SQLException, ClassNotFoundException {
451+
Properties properties) throws SQLException, ClassNotFoundException, IOException {
431452
String jdbcDriver = getJDBCDriverName(user, propertyKey);
432453

433454
if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
@@ -440,7 +461,7 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
440461
throws ClassNotFoundException, SQLException, InterpreterException, IOException {
441462
final String user = interpreterContext.getAuthenticationInfo().getUser();
442463
Connection connection;
443-
if (propertyKey == null || basePropretiesMap.get(propertyKey) == null) {
464+
if (propertyKey == null || basePropertiesMap.get(propertyKey) == null) {
444465
return null;
445466
}
446467

@@ -465,7 +486,7 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
465486
getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
466487
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
467488
} else {
468-
if (basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
489+
if (basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {
469490
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
470491
} else {
471492
UserGroupInformation ugi = null;
@@ -505,17 +526,17 @@ private String appendProxyUserToURL(String url, String user, String propertyKey)
505526
StringBuilder connectionUrl = new StringBuilder(url);
506527

507528
if (user != null && !user.equals("anonymous") &&
508-
basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
529+
basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) {
509530

510531
Integer lastIndexOfUrl = connectionUrl.indexOf("?");
511532
if (lastIndexOfUrl == -1) {
512533
lastIndexOfUrl = connectionUrl.length();
513534
}
514535
logger.info("Using proxy user as :" + user);
515536
logger.info("Using proxy property for user as :" +
516-
basePropretiesMap.get(propertyKey).getProperty("proxy.user.property"));
537+
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property"));
517538
connectionUrl.insert(lastIndexOfUrl, ";" +
518-
basePropretiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
539+
basePropertiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";");
519540
} else if (user != null && !user.equals("anonymous") && url.contains("hive")) {
520541
logger.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" +
521542
".org/docs/latest/interpreter/jdbc.html#apache-hive");
@@ -608,7 +629,7 @@ private boolean isDDLCommand(int updatedCount, int columnCount) throws SQLExcept
608629

609630
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
610631
InterpreterResult interpreterResult = null;
611-
for (String propertyKey : basePropretiesMap.keySet()) {
632+
for (String propertyKey : basePropertiesMap.keySet()) {
612633
String precode = getProperty(String.format("%s.precode", propertyKey));
613634
if (StringUtils.isNotBlank(precode)) {
614635
interpreterResult = executeSql(propertyKey, precode, interpreterContext);

zeppelin-zengine/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
<org.reflections.version>0.9.8</org.reflections.version>
4545
<xml.apis.version>1.4.01</xml.apis.version>
4646
<frontend.maven.plugin.version>1.3</frontend.maven.plugin.version>
47+
<oss.version>3.8.0</oss.version>
4748
<aws.sdk.s3.version>1.11.736</aws.sdk.s3.version>
4849
<commons.vfs2.version>2.2</commons.vfs2.version>
4950
<eclipse.jgit.version>4.5.4.201711221230-r</eclipse.jgit.version>
@@ -327,6 +328,12 @@
327328
<artifactId>aws-java-sdk-s3</artifactId>
328329
<version>${aws.sdk.s3.version}</version>
329330
</dependency>
331+
332+
<dependency>
333+
<groupId>com.aliyun.oss</groupId>
334+
<artifactId>aliyun-sdk-oss</artifactId>
335+
<version>${oss.version}</version>
336+
</dependency>
330337

331338
<dependency>
332339
<groupId>org.apache.hadoop</groupId>
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.storage;
19+
20+
import com.aliyun.oss.OSS;
21+
import com.aliyun.oss.OSSClientBuilder;
22+
import com.aliyun.oss.model.OSSObject;
23+
import com.aliyun.oss.model.PutObjectRequest;
24+
import com.amazonaws.AmazonClientException;
25+
import com.google.common.annotations.VisibleForTesting;
26+
import org.apache.commons.io.IOUtils;
27+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
28+
import org.apache.zeppelin.interpreter.InterpreterInfoSaving;
29+
import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.*;
34+
35+
/**
36+
* Storing config in Aliyun OSS file system
37+
*/
38+
public class OSSConfigStorage extends ConfigStorage {
39+
40+
41+
private static Logger LOGGER = LoggerFactory.getLogger(OSSConfigStorage.class);
42+
43+
44+
45+
private OSS ossClient;
46+
private String bucketName;
47+
private String interpreterSettingPath;
48+
private String authorizationPath;
49+
50+
51+
52+
public OSSConfigStorage(ZeppelinConfiguration zConf) {
53+
super(zConf);
54+
String endpoint = zConf.getOSSEndpoint();
55+
bucketName = zConf.getOSSBucketName();
56+
String rootFolder = zConf.getNotebookDir();
57+
if (rootFolder.startsWith("/")) {
58+
rootFolder = rootFolder.substring(1);
59+
}
60+
this.interpreterSettingPath = rootFolder + "/interpreter.json";
61+
this.authorizationPath = rootFolder + "/notebook-authorization.json";
62+
String accessKeyId = zConf.getOSSAccessKeyId();
63+
String accessKeySecret = zConf.getOSSAccessKeySecret();
64+
this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
65+
}
66+
67+
@Override
68+
public void save(InterpreterInfoSaving settingInfos) throws IOException {
69+
LOGGER.info("Save Interpreter Setting to oss://{}/{}", this.bucketName, this.interpreterSettingPath);
70+
saveToOSS(settingInfos.toJson(), interpreterSettingPath);
71+
}
72+
73+
@Override
74+
public InterpreterInfoSaving loadInterpreterSettings() throws IOException {
75+
LOGGER.info("Load Interpreter Setting from oss Path: " + interpreterSettingPath);
76+
String json = readFromOSS(interpreterSettingPath);
77+
return buildInterpreterInfoSaving(json);
78+
}
79+
80+
@Override
81+
public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException {
82+
LOGGER.info("Save notebook authorization to oss://{}/{} ",this.bucketName,this.authorizationPath);
83+
saveToOSS(authorizationInfoSaving.toJson(), authorizationPath);
84+
}
85+
86+
@Override
87+
public NotebookAuthorizationInfoSaving loadNotebookAuthorization() throws IOException {
88+
LOGGER.info("Load notebook authorization from oss Path: " + interpreterSettingPath);
89+
String json = readFromOSS(interpreterSettingPath);
90+
return NotebookAuthorizationInfoSaving.fromJson(json);
91+
}
92+
93+
@Override
94+
public String loadCredentials() throws IOException {
95+
return null;
96+
}
97+
98+
@Override
99+
public void saveCredentials(String credentials) throws IOException {
100+
101+
}
102+
103+
@VisibleForTesting
104+
void saveToOSS(String content, String ossPath) throws IOException {
105+
try {
106+
PutObjectRequest putObjectRequest = new com.aliyun.oss.model.PutObjectRequest(bucketName,
107+
ossPath, new ByteArrayInputStream(content.getBytes()));
108+
ossClient.putObject(putObjectRequest);
109+
}
110+
catch (AmazonClientException ace) {
111+
throw new IOException("Fail to store " + ossPath + " in OSS", ace);
112+
}
113+
114+
}
115+
116+
@VisibleForTesting
117+
String readFromOSS( String filePath) throws IOException {
118+
119+
OSSObject ossObject;
120+
try {
121+
ossObject = ossClient.getObject(bucketName, filePath);
122+
}
123+
catch (Exception e){
124+
throw new IOException("Fail to get file: " + filePath + " from OSS", e);
125+
}
126+
127+
try (InputStream in = ossObject.getObjectContent()){
128+
return IOUtils.toString(in,zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING));
129+
}
130+
}
131+
132+
}

0 commit comments

Comments
 (0)