Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,6 @@ public static String getDatabase() {

public static boolean runQuery(String sql) {
LOGGER.info("runQuery: (\"" + sql + "\")");

try {
throw new Exception("test");
} catch (Exception e) {
e.printStackTrace();
}

if (clickhouseContainer != null) {
try {
Container.ExecResult res = clickhouseContainer.execInContainer("clickhouse-client",
Expand Down
106 changes: 49 additions & 57 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class Client implements AutoCloseable {
private HttpAPIClientHelper httpClientHelper = null;

private final List<Endpoint> endpoints;
private final Map<String, String> configuration;
private final Map<String, Object> configuration;

private final Map<String, String> readOnlyConfig;

Expand Down Expand Up @@ -145,15 +145,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
private Client(Set<String> endpoints, Map<String,String> configuration,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) {
// Simple initialization
this.configuration = configuration;
this.readOnlyConfig = Collections.unmodifiableMap(this.configuration);
this.configuration = ClientConfigProperties.parseConfigMap(configuration);
this.readOnlyConfig = Collections.unmodifiableMap(configuration);
this.metricsRegistry = metricsRegistry;

// Serialization
this.pojoSerDe = new POJOSerDe(columnToMethodMatchingStrategy);

// Operation Execution
boolean isAsyncEnabled = MapUtils.getFlag(this.configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey(), false);
boolean isAsyncEnabled = ClientConfigProperties.ASYNC_OPERATIONS.getOrDefault(this.configuration);

if (isAsyncEnabled && sharedOperationExecutor == null) {
this.isSharedOpExecutorOwned = true;
this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
Expand All @@ -179,7 +180,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
}

this.endpoints = tmpEndpoints.build();
this.httpClientHelper = new HttpAPIClientHelper(configuration, metricsRegistry, initSslContext);
this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext);

String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
this.retries = retry == null ? 0 : Integer.parseInt(retry);
Expand Down Expand Up @@ -217,7 +218,7 @@ public void loadServerInfo() {
* @return String - actual default database name.
*/
public String getDefaultDatabase() {
return this.configuration.get("database");
return (String) this.configuration.get(ClientConfigProperties.DATABASE.getKey());
}


Expand Down Expand Up @@ -845,7 +846,7 @@ public Builder setMaxRetries(int maxRetries) {
* @return
*/
public Builder allowBinaryReaderToReuseBuffers(boolean reuse) {
this.configuration.put("client_allow_binary_reader_to_reuse_buffers", String.valueOf(reuse));
this.configuration.put(ClientConfigProperties.BINARY_READER_USE_PREALLOCATED_BUFFERS.getKey(), String.valueOf(reuse));
return this;
}

Expand Down Expand Up @@ -1009,20 +1010,21 @@ public Client build() {
throw new IllegalArgumentException("At least one endpoint is required");
}
// check if username and password are empty. so can not initiate client?
if (!this.configuration.containsKey("access_token") &&
(!this.configuration.containsKey("user") || !this.configuration.containsKey("password")) &&
!MapUtils.getFlag(this.configuration, "ssl_authentication", false) &&
!this.configuration.containsKey(ClientConfigProperties.httpHeader(HttpHeaders.AUTHORIZATION))) {
boolean useSslAuth = MapUtils.getFlag(this.configuration, ClientConfigProperties.SSL_AUTH.getKey());
boolean hasAccessToken = this.configuration.containsKey(ClientConfigProperties.ACCESS_TOKEN.getKey());
boolean hasUser = this.configuration.containsKey(ClientConfigProperties.USER.getKey());
boolean hasPassword = this.configuration.containsKey(ClientConfigProperties.PASSWORD.getKey());
boolean customHttpHeaders = this.configuration.containsKey(ClientConfigProperties.httpHeader(HttpHeaders.AUTHORIZATION));

if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to mention what exactly is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when nothing is specified. That we say in the exception message

throw new IllegalArgumentException("Username and password (or access token or SSL authentication or pre-define Authorization header) are required");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to wrap it as a client exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine - it is not even a client yet.

}

if (this.configuration.containsKey("ssl_authentication") &&
(this.configuration.containsKey("password") || this.configuration.containsKey("access_token"))) {
if (useSslAuth && (hasAccessToken || hasPassword)) {
throw new IllegalArgumentException("Only one of password, access token or SSL authentication can be used per client.");
}

if (this.configuration.containsKey("ssl_authentication") &&
!this.configuration.containsKey(ClientConfigProperties.SSL_CERTIFICATE.getKey())) {
if (useSslAuth && !this.configuration.containsKey(ClientConfigProperties.SSL_CERTIFICATE.getKey())) {
throw new IllegalArgumentException("SSL authentication requires a client certificate");
}

Expand Down Expand Up @@ -1159,17 +1161,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
if (data == null || data.isEmpty()) {
throw new IllegalArgumentException("Data cannot be empty");
}

//Add format to the settings
if (settings == null) {
settings = new InsertSettings();
}

String operationId = registerOperationMetrics();
settings.setOperationId(operationId);
globalClientStats.get(operationId).start(ClientMetrics.OP_DURATION);
globalClientStats.get(operationId).start(ClientMetrics.OP_SERIALIZATION);

//Add format to the settings
if (settings == null) {
settings = new InsertSettings();
}

boolean hasDefaults = this.tableSchemaHasDefaults.get(tableName);
ClickHouseFormat format = hasDefaults? ClickHouseFormat.RowBinaryWithDefaults : ClickHouseFormat.RowBinary;
Expand All @@ -1193,11 +1194,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
}


String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
final int maxRetries = retry == null ? 0 : Integer.parseInt(retry);
Integer retry = (Integer) configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
final int maxRetries = retry == null ? 0 : retry;

settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name());
final InsertSettings finalSettings = settings;
final InsertSettings finalSettings = new InsertSettings(buildRequestSettings(settings.getAllSettings()));
Supplier<InsertResponse> supplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Expand Down Expand Up @@ -1319,8 +1320,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
InsertSettings settings) {

final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(),
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getDefaultValue())) :
(int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) :
settings.getInputStreamCopyBufferSize();

if (writeBufferSize <= 0) {
Expand Down Expand Up @@ -1392,17 +1392,16 @@ public CompletableFuture<InsertResponse> insert(String tableName,

Supplier<InsertResponse> responseSupplier;


final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "8192")) :
(int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) :
settings.getInputStreamCopyBufferSize();

if (writeBufferSize <= 0) {
throw new IllegalArgumentException("Buffer size must be greater than 0");
}

settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name());
final InsertSettings finalSettings = settings;
final InsertSettings finalSettings = new InsertSettings(buildRequestSettings(settings.getAllSettings()));

StringBuilder sqlStmt = new StringBuilder("INSERT INTO ").append(tableName);
if (columnNames != null && !columnNames.isEmpty()) {
Expand Down Expand Up @@ -1532,14 +1531,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
}
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
clientStats.start(ClientMetrics.OP_DURATION);
applyDefaults(settings);

Supplier<QueryResponse> responseSupplier;

if (queryParams != null) {
settings.setOption("statement_params", queryParams);
}
final QuerySettings finalSettings = settings;
final QuerySettings finalSettings = new QuerySettings(buildRequestSettings(settings.getAllSettings()));
responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Expand Down Expand Up @@ -1917,7 +1915,7 @@ public CompletableFuture<CommandResponse> execute(String sql) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
ClickHouseBinaryFormatReader reader = null;
// Using caching buffer allocator is risky so this parameter is not exposed to the user
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers");
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers", false);
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
new BinaryStreamReader.CachingByteBufferAllocator() :
new BinaryStreamReader.DefaultByteBufferAllocator();
Expand Down Expand Up @@ -1955,25 +1953,6 @@ private String registerOperationMetrics() {
return operationId;
}

private void applyDefaults(QuerySettings settings) {
Map<String, Object> settingsMap = settings.getAllSettings();

String key = ClientConfigProperties.USE_SERVER_TIMEZONE.getKey();
if (!settingsMap.containsKey(key) && configuration.containsKey(key)) {
settings.setOption(key, MapUtils.getFlag(configuration, key));
}

key = ClientConfigProperties.USE_TIMEZONE.getKey();
if ( !settings.getUseServerTimeZone() && !settingsMap.containsKey(key) && configuration.containsKey(key)) {
settings.setOption(key, TimeZone.getTimeZone(configuration.get(key)));
}

key = ClientConfigProperties.SERVER_TIMEZONE.getKey();
if (!settingsMap.containsKey(key) && configuration.containsKey(key)) {
settings.setOption(key, TimeZone.getTimeZone(configuration.get(key)));
}
}

private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
if (isAsync) {
Expand Down Expand Up @@ -2001,7 +1980,7 @@ public Map<String, String> getConfiguration() {

/** Returns operation timeout in seconds */
protected int getOperationTimeout() {
return Integer.parseInt(configuration.get(ClientConfigProperties.MAX_EXECUTION_TIME.getKey()));
return ClientConfigProperties.MAX_EXECUTION_TIME.getOrDefault(configuration);
}

/**
Expand All @@ -2014,15 +1993,16 @@ public Set<String> getEndpoints() {
}

public String getUser() {
return this.configuration.get(ClientConfigProperties.USER.getKey());
return (String) this.configuration.get(ClientConfigProperties.USER.getKey());
}

public String getServerVersion() {
return this.serverVersion;
}

public String getServerTimeZone() {
return this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey());
TimeZone tz = (TimeZone) this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey());
return tz == null ? null : tz.getID();
}

public String getClientVersion() {
Expand All @@ -2035,10 +2015,9 @@ public String getClientVersion() {
* @param dbRoles
*/
public void setDBRoles(Collection<String> dbRoles) {
this.configuration.put(ClientConfigProperties.SESSION_DB_ROLES.getKey(), ClientConfigProperties.commaSeparated(dbRoles));
this.unmodifiableDbRolesView =
Collections.unmodifiableCollection(ClientConfigProperties.valuesFromCommaSeparated(
this.configuration.get(ClientConfigProperties.SESSION_DB_ROLES.getKey())));
List<String> tmp = new ArrayList<>(dbRoles);
this.configuration.put(ClientConfigProperties.SESSION_DB_ROLES.getKey(), tmp);
this.unmodifiableDbRolesView = ImmutableList.copyOf(tmp);
}

public void updateClientName(String name) {
Expand Down Expand Up @@ -2069,4 +2048,17 @@ private Endpoint getNextAliveNode() {
}

public static final String VALUES_LIST_DELIMITER = ",";

/**
* Produces a merge of operation and client settings.
* Operation settings override client settings
* @param opSettings - operation settings
* @return request settings - merged client and operation settings
*/
private Map<String, Object> buildRequestSettings(Map<String, Object> opSettings) {
Map<String, Object> requestSettings = new HashMap<>();
requestSettings.putAll(configuration);
requestSettings.putAll(opSettings);
return requestSettings;
}
}
Loading
Loading