2020import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
2121import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
2222import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
23+ import com .datastax .oss .driver .api .core .config .DriverOption ;
2324import com .datastax .oss .driver .api .core .config .ProgrammaticDriverConfigLoaderBuilder ;
24- import com .datastax .oss .driver .internal .core .loadbalancing .DcInferringLoadBalancingPolicy ;
2525import com .datastax .oss .driver .shaded .guava .common .net .InetAddresses ;
2626import org .apache .commons .lang3 .StringUtils ;
2727import org .apache .zeppelin .interpreter .Interpreter ;
4242import java .security .KeyStore ;
4343import java .util .ArrayList ;
4444import java .util .Collection ;
45+ import java .util .HashMap ;
4546import java .util .List ;
47+ import java .util .Map ;
4648import java .util .Properties ;
4749
4850import static java .lang .Integer .parseInt ;
@@ -112,15 +114,33 @@ public class CassandraInterpreter extends Interpreter {
112114 public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
113115 "cassandra.ssl.truststore.password" ;
114116
115-
117+ public static final String CASSANDRA_FORMAT_FLOAT_PRECISION =
118+ "cassandra.format.float_precision" ;
119+ public static final String CASSANDRA_FORMAT_DOUBLE_PRECISION =
120+ "cassandra.format.double_precision" ;
121+ public static final String CASSANDRA_FORMAT_TIMESTAMP =
122+ "cassandra.format.timestamp" ;
123+ public static final String CASSANDRA_FORMAT_TIME =
124+ "cassandra.format.time" ;
125+ public static final String CASSANDRA_FORMAT_DATE =
126+ "cassandra.format.date" ;
127+ public static final String CASSANDRA_FORMAT_TYPE =
128+ "cassandra.format.output" ;
129+ public static final String CASSANDRA_FORMAT_TIMEZONE =
130+ "cassandra.format.timezone" ;
131+ public static final String CASSANDRA_FORMAT_LOCALE =
132+ "cassandra.format.locale" ;
133+
134+ public static final String NONE_VALUE = "none" ;
135+ public static final String DEFAULT_VALUE = "DEFAULT" ;
116136 public static final String DEFAULT_HOST = "127.0.0.1" ;
117137 public static final String DEFAULT_PORT = "9042" ;
118138 public static final String DEFAULT_KEYSPACE = "system" ;
119139 public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT" ;
120- public static final String DEFAULT_COMPRESSION = "none" ;
140+ public static final String DEFAULT_COMPRESSION = NONE_VALUE ;
121141 public static final String DEFAULT_CONNECTIONS_PER_HOST = "1" ;
122142 public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024" ;
123- public static final String DEFAULT_POLICY = "DEFAULT" ;
143+ public static final String DEFAULT_POLICY = DEFAULT_VALUE ;
124144 public static final String DEFAULT_PARALLELISM = "10" ;
125145 public static final String DEFAULT_POOL_TIMEOUT = "5000" ;
126146 public static final String DEFAULT_HEARTBEAT_INTERVAL = "30" ;
@@ -133,75 +153,72 @@ public class CassandraInterpreter extends Interpreter {
133153 public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12" ;
134154
135155 static final List NO_COMPLETION = new ArrayList <>();
156+ public static final String DATASTAX_JAVA_DRIVER_PREFIX = "datastax-java-driver." ;
157+ public static final String MILLISECONDS_STR = " milliseconds" ;
158+ public static final String SECONDS_STR = " seconds" ;
136159
137160 InterpreterLogic helper ;
138161 CqlSession session ;
139- private JavaDriverConfig driverConfig = new JavaDriverConfig ();
162+ private static final Map <String , DriverOption > optionMap = new HashMap <>();
163+
164+ static {
165+ for (DefaultDriverOption opt : DefaultDriverOption .values ()) {
166+ optionMap .put (opt .getPath (), opt );
167+ }
168+ }
140169
141170 public CassandraInterpreter (Properties properties ) {
142171 super (properties );
143172 }
144173
145174 @ Override
146175 public void open () {
147-
148- final String [] addresses = getProperty ( CASSANDRA_HOSTS , DEFAULT_HOST ).split ("," );
176+ final String [] addresses = getProperty ( CASSANDRA_HOSTS , DEFAULT_HOST )
177+ . trim ( ).split ("," );
149178 final int port = parseInt (getProperty (CASSANDRA_PORT , DEFAULT_PORT ));
150179 Collection <InetSocketAddress > hosts = new ArrayList <>();
151180 for (String address : addresses ) {
152- if (InetAddresses .isInetAddress (address )) {
153- hosts .add (new InetSocketAddress (address , port ));
154- } else {
155- // TODO(alex): maybe it won't be necessary in 4.4
156- hosts .add (InetSocketAddress .createUnresolved (address , port ));
181+ if (!StringUtils .isBlank (address )) {
182+ logger .debug ("Adding contact point: {}" , address );
183+ if (InetAddresses .isInetAddress (address )) {
184+ hosts .add (new InetSocketAddress (address , port ));
185+ } else {
186+ hosts .add (InetSocketAddress .createUnresolved (address , port ));
187+ }
157188 }
158189 }
159190
160- LOGGER .info ("Bootstrapping Cassandra Java Driver to connect to " +
161- getProperty (CASSANDRA_HOSTS ) + "on port " + port );
162-
163- // start generation of the config
164- ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader .programmaticBuilder ();
165-
166- driverConfig .setCompressionProtocol (this , configBuilder );
167- driverConfig .setPoolingOptions (this , configBuilder );
168- driverConfig .setProtocolVersion (this , configBuilder );
169- driverConfig .setQueryOptions (this , configBuilder );
170- driverConfig .setReconnectionPolicy (this , configBuilder );
171- driverConfig .setRetryPolicy (this , configBuilder );
172- driverConfig .setSocketOptions (this , configBuilder );
173- driverConfig .setSpeculativeExecutionPolicy (this , configBuilder );
191+ LOGGER .info ("Bootstrapping Cassandra Java Driver to connect to {} on port {}" ,
192+ getProperty (CASSANDRA_HOSTS ), port );
174193
175- //
176- configBuilder .withClass (DefaultDriverOption .LOAD_BALANCING_POLICY_CLASS ,
177- DcInferringLoadBalancingPolicy .class );
178- configBuilder .withBoolean (DefaultDriverOption .RESOLVE_CONTACT_POINTS , false );
179-
180- configBuilder .withInt (DefaultDriverOption .CONTROL_CONNECTION_AGREEMENT_TIMEOUT ,
181- parseInt (getProperty (CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS ,
182- DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS )));
183-
184- DriverConfigLoader loader = configBuilder .endProfile ().build ();
185- // TODO(alex): think how to dump built configuration...
186- logger .debug (loader .toString ());
187- // end generation of config
194+ DriverConfigLoader loader = createLoader ();
188195
196+ LOGGER .debug ("Creating cluster builder" );
189197 CqlSessionBuilder clusterBuilder = CqlSession .builder ()
190- .addContactPoints (hosts )
191- .withAuthCredentials (getProperty (CASSANDRA_CREDENTIALS_USERNAME ),
192- getProperty (CASSANDRA_CREDENTIALS_PASSWORD ))
193- .withApplicationName ("" )
198+ .withApplicationName ("Zeppelin" )
194199 .withApplicationVersion ("" );
200+ if (!hosts .isEmpty ()) {
201+ LOGGER .debug ("Adding contact points" );
202+ clusterBuilder .addContactPoints (hosts );
203+ }
204+
205+ String username = getProperty (CASSANDRA_CREDENTIALS_USERNAME , NONE_VALUE ).trim ();
206+ String password = getProperty (CASSANDRA_CREDENTIALS_PASSWORD , NONE_VALUE ).trim ();
207+ if (StringUtils .isNotBlank (username ) && StringUtils .isNotBlank (password ) &&
208+ !NONE_VALUE .equalsIgnoreCase (username ) && !NONE_VALUE .equalsIgnoreCase (password )) {
209+ LOGGER .debug ("Adding credentials. Username = {}" , username );
210+ clusterBuilder .withAuthCredentials (username , password );
211+ }
195212
196213 String keyspace = getProperty (CASSANDRA_KEYSPACE_NAME , DEFAULT_KEYSPACE );
197214 if (StringUtils .isNotBlank (keyspace ) && !DEFAULT_KEYSPACE .equalsIgnoreCase (keyspace )) {
215+ LOGGER .debug ("Set default keyspace" );
198216 clusterBuilder .withKeyspace (keyspace );
199217 }
200218
201- final String runWithSSL = getProperty (CASSANDRA_WITH_SSL );
202- if (runWithSSL != null && runWithSSL .equals ("true" )) {
203- LOGGER .debug ("Cassandra Interpreter: Using SSL" );
204-
219+ final String runWithSSL = getProperty (CASSANDRA_WITH_SSL , "false" );
220+ if ("true" .equalsIgnoreCase (runWithSSL )) {
221+ LOGGER .debug ("Using SSL" );
205222 try {
206223 final SSLContext sslContext ;
207224 {
@@ -219,19 +236,149 @@ public void open() {
219236 }
220237 clusterBuilder = clusterBuilder .withSslContext (sslContext );
221238 } catch (Exception e ) {
222- LOGGER .error (e .toString ());
239+ LOGGER .error ("Exception initializing SSL {}" , e .toString ());
223240 }
224241 } else {
225- LOGGER .debug ("Cassandra Interpreter: Not using SSL" );
242+ LOGGER .debug ("Not using SSL" );
226243 }
227244
245+ LOGGER .debug ("Creating CqlSession" );
228246 session = clusterBuilder .withConfigLoader (loader ).build ();
229- helper = new InterpreterLogic (session );
247+ LOGGER .debug ("Session configuration" );
248+ for (Map .Entry <String , Object > entry :
249+ session .getContext ().getConfig ().getDefaultProfile ().entrySet ()) {
250+ logger .debug ("{} = {}" , entry .getKey (), entry .getValue ().toString ());
251+ }
252+ LOGGER .debug ("Creating helper" );
253+ helper = new InterpreterLogic (session , properties );
254+ }
255+
256+ private DriverConfigLoader createLoader () {
257+ logger .debug ("Creating programmatic config loader" );
258+ // start generation of the config
259+ ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader .programmaticBuilder ();
260+
261+ Map <DriverOption , String > allOptions = new HashMap <>();
262+
263+ // set options from main configuration
264+ String ts = getProperty (CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS ,
265+ CassandraInterpreter .DEFAULT_CONNECTION_TIMEOUT ) + MILLISECONDS_STR ;
266+ allOptions .put (DefaultDriverOption .CONNECTION_INIT_QUERY_TIMEOUT , ts );
267+ allOptions .put (DefaultDriverOption .CONTROL_CONNECTION_TIMEOUT , ts );
268+ allOptions .put (DefaultDriverOption .REQUEST_TIMEOUT ,
269+ getProperty (CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS ,
270+ CassandraInterpreter .DEFAULT_READ_TIMEOUT ) + MILLISECONDS_STR );
271+ addIfNotBlank (allOptions ,
272+ getProperty (CASSANDRA_SOCKET_TCP_NO_DELAY , CassandraInterpreter .DEFAULT_TCP_NO_DELAY ),
273+ DefaultDriverOption .SOCKET_TCP_NODELAY );
274+ addIfNotBlank (allOptions , getProperty (CASSANDRA_SOCKET_KEEP_ALIVE ),
275+ DefaultDriverOption .SOCKET_KEEP_ALIVE );
276+ addIfNotBlank (allOptions , getProperty (CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES ),
277+ DefaultDriverOption .SOCKET_RECEIVE_BUFFER_SIZE );
278+ addIfNotBlank (allOptions , getProperty (CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES ),
279+ DefaultDriverOption .SOCKET_SEND_BUFFER_SIZE );
280+ addIfNotBlank (allOptions , getProperty (CASSANDRA_SOCKET_REUSE_ADDRESS ),
281+ DefaultDriverOption .SOCKET_REUSE_ADDRESS );
282+ addIfNotBlank (allOptions , getProperty (CASSANDRA_SOCKET_SO_LINGER ),
283+ DefaultDriverOption .SOCKET_LINGER_INTERVAL );
284+ addIfNotBlank (allOptions ,
285+ getProperty (CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE ),
286+ DefaultDriverOption .REQUEST_DEFAULT_IDEMPOTENCE );
287+ allOptions .put (DefaultDriverOption .REQUEST_CONSISTENCY ,
288+ getProperty (CASSANDRA_QUERY_DEFAULT_CONSISTENCY ,
289+ CassandraInterpreter .DEFAULT_CONSISTENCY ));
290+ allOptions .put (DefaultDriverOption .REQUEST_SERIAL_CONSISTENCY ,
291+ getProperty (CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY ,
292+ CassandraInterpreter .DEFAULT_SERIAL_CONSISTENCY ));
293+ allOptions .put (DefaultDriverOption .REQUEST_PAGE_SIZE ,
294+ getProperty (CASSANDRA_QUERY_DEFAULT_FETCH_SIZE ,
295+ CassandraInterpreter .DEFAULT_FETCH_SIZE ));
296+ ts = getProperty (CASSANDRA_PROTOCOL_VERSION , DEFAULT_PROTOCOL_VERSION );
297+ if (!DEFAULT_VALUE .equalsIgnoreCase (ts )) {
298+ // for compatibility with previous configurations
299+ if (ts .equals ("4" ) || ts .equals ("3" )) {
300+ ts = "V" + ts ;
301+ }
302+ allOptions .put (DefaultDriverOption .PROTOCOL_VERSION , ts );
303+ }
304+ addIfNotBlank (allOptions , getProperty (CASSANDRA_COMPRESSION_PROTOCOL ,
305+ CassandraInterpreter .DEFAULT_COMPRESSION ).toLowerCase (),
306+ DefaultDriverOption .PROTOCOL_COMPRESSION );
307+ addIfNotBlankOrDefault (allOptions , getProperty (CASSANDRA_RETRY_POLICY , DEFAULT_POLICY ),
308+ DefaultDriverOption .RETRY_POLICY_CLASS );
309+ addIfNotBlankOrDefault (allOptions ,
310+ getProperty (CASSANDRA_RECONNECTION_POLICY , DEFAULT_POLICY ),
311+ DefaultDriverOption .RECONNECTION_POLICY_CLASS );
312+ addIfNotBlankOrDefault (allOptions ,
313+ getProperty (CASSANDRA_SPECULATIVE_EXECUTION_POLICY , DEFAULT_POLICY ),
314+ DefaultDriverOption .SPECULATIVE_EXECUTION_POLICY_CLASS );
315+ allOptions .put (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE ,
316+ getProperty (CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL ,
317+ DEFAULT_CONNECTIONS_PER_HOST ));
318+ allOptions .put (DefaultDriverOption .CONNECTION_POOL_REMOTE_SIZE ,
319+ getProperty (CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE ,
320+ DEFAULT_CONNECTIONS_PER_HOST ));
321+ allOptions .put (DefaultDriverOption .CONNECTION_MAX_REQUESTS ,
322+ getProperty (CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION ,
323+ DEFAULT_MAX_REQUEST_PER_CONNECTION ));
324+ allOptions .put (DefaultDriverOption .HEARTBEAT_INTERVAL ,
325+ getProperty (CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS ,
326+ DEFAULT_HEARTBEAT_INTERVAL ) + SECONDS_STR );
327+ ts = getProperty (CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS ,
328+ DEFAULT_POOL_TIMEOUT ) + MILLISECONDS_STR ;
329+ allOptions .put (DefaultDriverOption .HEARTBEAT_TIMEOUT , ts );
330+ allOptions .put (DefaultDriverOption .CONNECTION_INIT_QUERY_TIMEOUT , ts );
331+ allOptions .put (DefaultDriverOption .LOAD_BALANCING_POLICY_CLASS ,
332+ "DcInferringLoadBalancingPolicy" );
333+ allOptions .put (DefaultDriverOption .RESOLVE_CONTACT_POINTS , "false" );
334+ allOptions .put (DefaultDriverOption .CONTROL_CONNECTION_AGREEMENT_TIMEOUT ,
335+ getProperty (CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS ,
336+ DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS ) + SECONDS_STR );
337+
338+ // extract additional options that may override values set by main configuration
339+ for (String pname : properties .stringPropertyNames ()) {
340+ if (pname .startsWith (DATASTAX_JAVA_DRIVER_PREFIX )) {
341+ String pvalue = properties .getProperty (pname );
342+ logger .info ("Custom config values: {} = {}" , pname , pvalue );
343+ String shortName = pname .substring (DATASTAX_JAVA_DRIVER_PREFIX .length ());
344+ if (optionMap .containsKey (shortName )) {
345+ allOptions .put (optionMap .get (shortName ), pvalue );
346+ } else {
347+ logger .warn ("Incorrect option name: {}" , pname );
348+ }
349+ }
350+ }
351+
352+ for (Map .Entry <DriverOption , String > entry : allOptions .entrySet ()) {
353+ configBuilder .withString (entry .getKey (), entry .getValue ());
354+ }
355+
356+ DriverConfigLoader loader = configBuilder .endProfile ().build ();
357+ logger .debug ("Config loader is created" );
358+
359+ return loader ;
360+ }
361+
362+ private static void addIfNotBlank (Map <DriverOption , String > allOptions ,
363+ String value ,
364+ DefaultDriverOption option ) {
365+ if (!StringUtils .isBlank (value )) {
366+ allOptions .put (option , value );
367+ }
368+ }
369+
370+ private static void addIfNotBlankOrDefault (Map <DriverOption , String > allOptions ,
371+ String value ,
372+ DefaultDriverOption option ) {
373+ if (!StringUtils .isBlank (value ) && !DEFAULT_VALUE .equalsIgnoreCase (value )) {
374+ allOptions .put (option , value );
375+ }
230376 }
231377
232378 @ Override
233379 public void close () {
234- session .close ();
380+ if (session != null )
381+ session .close ();
235382 }
236383
237384 @ Override
0 commit comments