Skip to content

Commit 6a2cf04

Browse files
committed
2 parents 076668a + 582a10d commit 6a2cf04

File tree

228 files changed

+6471
-2049
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

228 files changed

+6471
-2049
lines changed

.travis.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,23 @@ jobs:
9595
dist: xenial
9696
env: PYTHON="3" R="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS=""
9797

98+
# Test flink 1.10
99+
- jdk: "openjdk8"
100+
dist: xenial
101+
env: PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*"
102+
103+
# Test flink 1.11 & flink integration test
104+
- jdk: "openjdk8"
105+
dist: xenial
106+
env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.0" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest"
107+
98108
# Run Spark integration test and unit test
99109

110+
# Run spark integration of in one zeppelin instance: Spark 3.0
111+
- jdk: "openjdk8"
112+
dist: xenial
113+
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown -am" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest30,SparkIntegrationTest30 -DfailIfNoTests=false"
114+
100115
# Run spark integration of in one zeppelin instance (2.4, 2.3, 2.2)
101116
- jdk: "openjdk8"
102117
dist: xenial
@@ -162,6 +177,7 @@ before_install:
162177
- clearcache=$(echo $gitlog | grep -c -E "clear bower|bower clear" || true)
163178
- if [ "$hasbowerchanged" -gt 0 ] || [ "$clearcache" -gt 0 ]; then echo "Clearing bower_components cache"; rm -r zeppelin-web/bower_components; npm cache verify; else echo "Using cached bower_components."; fi
164179
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
180+
- if [[ -n $R ]]; then ./testing/install_R.sh; fi
165181
- bash -x ./testing/install_external_dependencies.sh
166182
- ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true
167183
- ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached"

cassandra/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<description>Zeppelin cassandra support</description>
3434

3535
<properties>
36-
<cassandra.driver.version>4.6.1</cassandra.driver.version>
36+
<cassandra.driver.version>4.7.2</cassandra.driver.version>
3737
<snappy.version>1.1.7.3</snappy.version>
3838
<lz4.version>1.6.0</lz4.version>
3939
<scalate.version>1.7.1</scalate.version>

cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java

Lines changed: 198 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
2121
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2222
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
23+
import com.datastax.oss.driver.api.core.config.DriverOption;
2324
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
24-
import com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy;
2525
import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses;
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.zeppelin.interpreter.Interpreter;
@@ -42,7 +42,9 @@
4242
import java.security.KeyStore;
4343
import java.util.ArrayList;
4444
import java.util.Collection;
45+
import java.util.HashMap;
4546
import java.util.List;
47+
import java.util.Map;
4648
import java.util.Properties;
4749

4850
import static java.lang.Integer.parseInt;
@@ -112,15 +114,33 @@ public class CassandraInterpreter extends Interpreter {
112114
public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
113115
"cassandra.ssl.truststore.password";
114116

115-
117+
public static final String CASSANDRA_FORMAT_FLOAT_PRECISION =
118+
"cassandra.format.float_precision";
119+
public static final String CASSANDRA_FORMAT_DOUBLE_PRECISION =
120+
"cassandra.format.double_precision";
121+
public static final String CASSANDRA_FORMAT_TIMESTAMP =
122+
"cassandra.format.timestamp";
123+
public static final String CASSANDRA_FORMAT_TIME =
124+
"cassandra.format.time";
125+
public static final String CASSANDRA_FORMAT_DATE =
126+
"cassandra.format.date";
127+
public static final String CASSANDRA_FORMAT_TYPE =
128+
"cassandra.format.output";
129+
public static final String CASSANDRA_FORMAT_TIMEZONE =
130+
"cassandra.format.timezone";
131+
public static final String CASSANDRA_FORMAT_LOCALE =
132+
"cassandra.format.locale";
133+
134+
public static final String NONE_VALUE = "none";
135+
public static final String DEFAULT_VALUE = "DEFAULT";
116136
public static final String DEFAULT_HOST = "127.0.0.1";
117137
public static final String DEFAULT_PORT = "9042";
118138
public static final String DEFAULT_KEYSPACE = "system";
119139
public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT";
120-
public static final String DEFAULT_COMPRESSION = "none";
140+
public static final String DEFAULT_COMPRESSION = NONE_VALUE;
121141
public static final String DEFAULT_CONNECTIONS_PER_HOST = "1";
122142
public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024";
123-
public static final String DEFAULT_POLICY = "DEFAULT";
143+
public static final String DEFAULT_POLICY = DEFAULT_VALUE;
124144
public static final String DEFAULT_PARALLELISM = "10";
125145
public static final String DEFAULT_POOL_TIMEOUT = "5000";
126146
public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
@@ -133,75 +153,72 @@ public class CassandraInterpreter extends Interpreter {
133153
public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12";
134154

135155
static final List NO_COMPLETION = new ArrayList<>();
156+
public static final String DATASTAX_JAVA_DRIVER_PREFIX = "datastax-java-driver.";
157+
public static final String MILLISECONDS_STR = " milliseconds";
158+
public static final String SECONDS_STR = " seconds";
136159

137160
InterpreterLogic helper;
138161
CqlSession session;
139-
private JavaDriverConfig driverConfig = new JavaDriverConfig();
162+
private static final Map<String, DriverOption> optionMap = new HashMap<>();
163+
164+
static {
165+
for (DefaultDriverOption opt: DefaultDriverOption.values()) {
166+
optionMap.put(opt.getPath(), opt);
167+
}
168+
}
140169

141170
public CassandraInterpreter(Properties properties) {
142171
super(properties);
143172
}
144173

145174
@Override
146175
public void open() {
147-
148-
final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST).split(",");
176+
final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST)
177+
.trim().split(",");
149178
final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT));
150179
Collection<InetSocketAddress> hosts = new ArrayList<>();
151180
for (String address : addresses) {
152-
if (InetAddresses.isInetAddress(address)) {
153-
hosts.add(new InetSocketAddress(address, port));
154-
} else {
155-
// TODO(alex): maybe it won't be necessary in 4.4
156-
hosts.add(InetSocketAddress.createUnresolved(address, port));
181+
if (!StringUtils.isBlank(address)) {
182+
logger.debug("Adding contact point: {}", address);
183+
if (InetAddresses.isInetAddress(address)) {
184+
hosts.add(new InetSocketAddress(address, port));
185+
} else {
186+
hosts.add(InetSocketAddress.createUnresolved(address, port));
187+
}
157188
}
158189
}
159190

160-
LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " +
161-
getProperty(CASSANDRA_HOSTS) + "on port " + port);
162-
163-
// start generation of the config
164-
ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();
165-
166-
driverConfig.setCompressionProtocol(this, configBuilder);
167-
driverConfig.setPoolingOptions(this, configBuilder);
168-
driverConfig.setProtocolVersion(this, configBuilder);
169-
driverConfig.setQueryOptions(this, configBuilder);
170-
driverConfig.setReconnectionPolicy(this, configBuilder);
171-
driverConfig.setRetryPolicy(this, configBuilder);
172-
driverConfig.setSocketOptions(this, configBuilder);
173-
driverConfig.setSpeculativeExecutionPolicy(this, configBuilder);
191+
LOGGER.info("Bootstrapping Cassandra Java Driver to connect to {} on port {}",
192+
getProperty(CASSANDRA_HOSTS), port);
174193

175-
//
176-
configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
177-
DcInferringLoadBalancingPolicy.class);
178-
configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false);
179-
180-
configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
181-
parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
182-
DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)));
183-
184-
DriverConfigLoader loader = configBuilder.endProfile().build();
185-
// TODO(alex): think how to dump built configuration...
186-
logger.debug(loader.toString());
187-
// end generation of config
194+
DriverConfigLoader loader = createLoader();
188195

196+
LOGGER.debug("Creating cluster builder");
189197
CqlSessionBuilder clusterBuilder = CqlSession.builder()
190-
.addContactPoints(hosts)
191-
.withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
192-
getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
193-
.withApplicationName("")
198+
.withApplicationName("Zeppelin")
194199
.withApplicationVersion("");
200+
if (!hosts.isEmpty()) {
201+
LOGGER.debug("Adding contact points");
202+
clusterBuilder.addContactPoints(hosts);
203+
}
204+
205+
String username = getProperty(CASSANDRA_CREDENTIALS_USERNAME, NONE_VALUE).trim();
206+
String password = getProperty(CASSANDRA_CREDENTIALS_PASSWORD, NONE_VALUE).trim();
207+
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password) &&
208+
!NONE_VALUE.equalsIgnoreCase(username) && !NONE_VALUE.equalsIgnoreCase(password)) {
209+
LOGGER.debug("Adding credentials. Username = {}", username);
210+
clusterBuilder.withAuthCredentials(username, password);
211+
}
195212

196213
String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE);
197214
if (StringUtils.isNotBlank(keyspace) && !DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) {
215+
LOGGER.debug("Set default keyspace");
198216
clusterBuilder.withKeyspace(keyspace);
199217
}
200218

201-
final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
202-
if (runWithSSL != null && runWithSSL.equals("true")) {
203-
LOGGER.debug("Cassandra Interpreter: Using SSL");
204-
219+
final String runWithSSL = getProperty(CASSANDRA_WITH_SSL, "false");
220+
if ("true".equalsIgnoreCase(runWithSSL)) {
221+
LOGGER.debug("Using SSL");
205222
try {
206223
final SSLContext sslContext;
207224
{
@@ -219,19 +236,149 @@ public void open() {
219236
}
220237
clusterBuilder = clusterBuilder.withSslContext(sslContext);
221238
} catch (Exception e) {
222-
LOGGER.error(e.toString());
239+
LOGGER.error("Exception initializing SSL {}", e.toString());
223240
}
224241
} else {
225-
LOGGER.debug("Cassandra Interpreter: Not using SSL");
242+
LOGGER.debug("Not using SSL");
226243
}
227244

245+
LOGGER.debug("Creating CqlSession");
228246
session = clusterBuilder.withConfigLoader(loader).build();
229-
helper = new InterpreterLogic(session);
247+
LOGGER.debug("Session configuration");
248+
for (Map.Entry<String, Object> entry:
249+
session.getContext().getConfig().getDefaultProfile().entrySet()) {
250+
logger.debug("{} = {}", entry.getKey(), entry.getValue().toString());
251+
}
252+
LOGGER.debug("Creating helper");
253+
helper = new InterpreterLogic(session, properties);
254+
}
255+
256+
private DriverConfigLoader createLoader() {
257+
logger.debug("Creating programmatic config loader");
258+
// start generation of the config
259+
ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();
260+
261+
Map<DriverOption, String> allOptions = new HashMap<>();
262+
263+
// set options from main configuration
264+
String ts = getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
265+
CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT) + MILLISECONDS_STR;
266+
allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
267+
allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, ts);
268+
allOptions.put(DefaultDriverOption.REQUEST_TIMEOUT,
269+
getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
270+
CassandraInterpreter.DEFAULT_READ_TIMEOUT) + MILLISECONDS_STR);
271+
addIfNotBlank(allOptions,
272+
getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY),
273+
DefaultDriverOption.SOCKET_TCP_NODELAY);
274+
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_KEEP_ALIVE),
275+
DefaultDriverOption.SOCKET_KEEP_ALIVE);
276+
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES),
277+
DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE);
278+
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES),
279+
DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE);
280+
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS),
281+
DefaultDriverOption.SOCKET_REUSE_ADDRESS);
282+
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SO_LINGER),
283+
DefaultDriverOption.SOCKET_LINGER_INTERVAL);
284+
addIfNotBlank(allOptions,
285+
getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE),
286+
DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE);
287+
allOptions.put(DefaultDriverOption.REQUEST_CONSISTENCY,
288+
getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
289+
CassandraInterpreter.DEFAULT_CONSISTENCY));
290+
allOptions.put(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY,
291+
getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
292+
CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY));
293+
allOptions.put(DefaultDriverOption.REQUEST_PAGE_SIZE,
294+
getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
295+
CassandraInterpreter.DEFAULT_FETCH_SIZE));
296+
ts = getProperty(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION);
297+
if (!DEFAULT_VALUE.equalsIgnoreCase(ts)) {
298+
// for compatibility with previous configurations
299+
if (ts.equals("4") || ts.equals("3")) {
300+
ts = "V" + ts;
301+
}
302+
allOptions.put(DefaultDriverOption.PROTOCOL_VERSION, ts);
303+
}
304+
addIfNotBlank(allOptions, getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
305+
CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase(),
306+
DefaultDriverOption.PROTOCOL_COMPRESSION);
307+
addIfNotBlankOrDefault(allOptions, getProperty(CASSANDRA_RETRY_POLICY, DEFAULT_POLICY),
308+
DefaultDriverOption.RETRY_POLICY_CLASS);
309+
addIfNotBlankOrDefault(allOptions,
310+
getProperty(CASSANDRA_RECONNECTION_POLICY, DEFAULT_POLICY),
311+
DefaultDriverOption.RECONNECTION_POLICY_CLASS);
312+
addIfNotBlankOrDefault(allOptions,
313+
getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, DEFAULT_POLICY),
314+
DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS);
315+
allOptions.put(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE,
316+
getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
317+
DEFAULT_CONNECTIONS_PER_HOST));
318+
allOptions.put(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE,
319+
getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
320+
DEFAULT_CONNECTIONS_PER_HOST));
321+
allOptions.put(DefaultDriverOption.CONNECTION_MAX_REQUESTS,
322+
getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
323+
DEFAULT_MAX_REQUEST_PER_CONNECTION));
324+
allOptions.put(DefaultDriverOption.HEARTBEAT_INTERVAL,
325+
getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
326+
DEFAULT_HEARTBEAT_INTERVAL) + SECONDS_STR);
327+
ts = getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
328+
DEFAULT_POOL_TIMEOUT) + MILLISECONDS_STR;
329+
allOptions.put(DefaultDriverOption.HEARTBEAT_TIMEOUT, ts);
330+
allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
331+
allOptions.put(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
332+
"DcInferringLoadBalancingPolicy");
333+
allOptions.put(DefaultDriverOption.RESOLVE_CONTACT_POINTS, "false");
334+
allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
335+
getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
336+
DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS) + SECONDS_STR);
337+
338+
// extract additional options that may override values set by main configuration
339+
for (String pname: properties.stringPropertyNames()) {
340+
if (pname.startsWith(DATASTAX_JAVA_DRIVER_PREFIX)) {
341+
String pvalue = properties.getProperty(pname);
342+
logger.info("Custom config values: {} = {}", pname, pvalue);
343+
String shortName = pname.substring(DATASTAX_JAVA_DRIVER_PREFIX.length());
344+
if (optionMap.containsKey(shortName)) {
345+
allOptions.put(optionMap.get(shortName), pvalue);
346+
} else {
347+
logger.warn("Incorrect option name: {}", pname);
348+
}
349+
}
350+
}
351+
352+
for (Map.Entry<DriverOption, String> entry: allOptions.entrySet()) {
353+
configBuilder.withString(entry.getKey(), entry.getValue());
354+
}
355+
356+
DriverConfigLoader loader = configBuilder.endProfile().build();
357+
logger.debug("Config loader is created");
358+
359+
return loader;
360+
}
361+
362+
private static void addIfNotBlank(Map<DriverOption, String> allOptions,
363+
String value,
364+
DefaultDriverOption option) {
365+
if (!StringUtils.isBlank(value)) {
366+
allOptions.put(option, value);
367+
}
368+
}
369+
370+
private static void addIfNotBlankOrDefault(Map<DriverOption, String> allOptions,
371+
String value,
372+
DefaultDriverOption option) {
373+
if (!StringUtils.isBlank(value) && !DEFAULT_VALUE.equalsIgnoreCase(value)) {
374+
allOptions.put(option, value);
375+
}
230376
}
231377

232378
@Override
233379
public void close() {
234-
session.close();
380+
if (session != null)
381+
session.close();
235382
}
236383

237384
@Override

0 commit comments

Comments
 (0)