-
Notifications
You must be signed in to change notification settings - Fork 614
[jdbc-v2, client-v2] Fix configuration handling by normalizing it. #2470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
813df63
e554769
122fc96
0e59b6e
ce2657d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,7 +115,7 @@ public class Client implements AutoCloseable { | |
| private HttpAPIClientHelper httpClientHelper = null; | ||
|
|
||
| private final List<Endpoint> endpoints; | ||
| private final Map<String, String> configuration; | ||
| private final Map<String, Object> configuration; | ||
|
|
||
| private final Map<String, String> readOnlyConfig; | ||
|
|
||
|
|
@@ -145,15 +145,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration, | |
| private Client(Set<String> endpoints, Map<String,String> configuration, | ||
| ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) { | ||
| // Simple initialization | ||
| this.configuration = configuration; | ||
| this.readOnlyConfig = Collections.unmodifiableMap(this.configuration); | ||
| this.configuration = ClientConfigProperties.parseConfigMap(configuration); | ||
| this.readOnlyConfig = Collections.unmodifiableMap(configuration); | ||
| this.metricsRegistry = metricsRegistry; | ||
|
|
||
| // Serialization | ||
| this.pojoSerDe = new POJOSerDe(columnToMethodMatchingStrategy); | ||
|
|
||
| // Operation Execution | ||
| boolean isAsyncEnabled = MapUtils.getFlag(this.configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey(), false); | ||
| boolean isAsyncEnabled = ClientConfigProperties.ASYNC_OPERATIONS.getOrDefault(this.configuration); | ||
|
|
||
| if (isAsyncEnabled && sharedOperationExecutor == null) { | ||
| this.isSharedOpExecutorOwned = true; | ||
| this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation")); | ||
|
|
@@ -179,7 +180,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, | |
| } | ||
|
|
||
| this.endpoints = tmpEndpoints.build(); | ||
| this.httpClientHelper = new HttpAPIClientHelper(configuration, metricsRegistry, initSslContext); | ||
| this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext); | ||
|
|
||
| String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); | ||
| this.retries = retry == null ? 0 : Integer.parseInt(retry); | ||
|
|
@@ -217,7 +218,7 @@ public void loadServerInfo() { | |
| * @return String - actual default database name. | ||
| */ | ||
| public String getDefaultDatabase() { | ||
| return this.configuration.get("database"); | ||
| return (String) this.configuration.get(ClientConfigProperties.DATABASE.getKey()); | ||
| } | ||
|
|
||
|
|
||
|
|
@@ -845,7 +846,7 @@ public Builder setMaxRetries(int maxRetries) { | |
| * @return | ||
| */ | ||
| public Builder allowBinaryReaderToReuseBuffers(boolean reuse) { | ||
| this.configuration.put("client_allow_binary_reader_to_reuse_buffers", String.valueOf(reuse)); | ||
| this.configuration.put(ClientConfigProperties.BINARY_READER_USE_PREALLOCATED_BUFFERS.getKey(), String.valueOf(reuse)); | ||
| return this; | ||
| } | ||
|
|
||
|
|
@@ -1009,20 +1010,21 @@ public Client build() { | |
| throw new IllegalArgumentException("At least one endpoint is required"); | ||
| } | ||
| // check if username and password are empty. so can not initiate client? | ||
| if (!this.configuration.containsKey("access_token") && | ||
| (!this.configuration.containsKey("user") || !this.configuration.containsKey("password")) && | ||
| !MapUtils.getFlag(this.configuration, "ssl_authentication", false) && | ||
| !this.configuration.containsKey(ClientConfigProperties.httpHeader(HttpHeaders.AUTHORIZATION))) { | ||
| boolean useSslAuth = MapUtils.getFlag(this.configuration, ClientConfigProperties.SSL_AUTH.getKey()); | ||
| boolean hasAccessToken = this.configuration.containsKey(ClientConfigProperties.ACCESS_TOKEN.getKey()); | ||
| boolean hasUser = this.configuration.containsKey(ClientConfigProperties.USER.getKey()); | ||
| boolean hasPassword = this.configuration.containsKey(ClientConfigProperties.PASSWORD.getKey()); | ||
| boolean customHttpHeaders = this.configuration.containsKey(ClientConfigProperties.httpHeader(HttpHeaders.AUTHORIZATION)); | ||
|
|
||
| if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to mention what exactly is missing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is when nothing is specified. That we say in the exception message |
||
| throw new IllegalArgumentException("Username and password (or access token or SSL authentication or pre-define Authorization header) are required"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to wrap it as a client exception?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is fine - it is not even a client yet. |
||
| } | ||
|
|
||
| if (this.configuration.containsKey("ssl_authentication") && | ||
| (this.configuration.containsKey("password") || this.configuration.containsKey("access_token"))) { | ||
| if (useSslAuth && (hasAccessToken || hasPassword)) { | ||
| throw new IllegalArgumentException("Only one of password, access token or SSL authentication can be used per client."); | ||
| } | ||
|
|
||
| if (this.configuration.containsKey("ssl_authentication") && | ||
| !this.configuration.containsKey(ClientConfigProperties.SSL_CERTIFICATE.getKey())) { | ||
| if (useSslAuth && !this.configuration.containsKey(ClientConfigProperties.SSL_CERTIFICATE.getKey())) { | ||
| throw new IllegalArgumentException("SSL authentication requires a client certificate"); | ||
| } | ||
|
|
||
|
|
@@ -1159,17 +1161,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data, | |
| if (data == null || data.isEmpty()) { | ||
| throw new IllegalArgumentException("Data cannot be empty"); | ||
| } | ||
|
|
||
| //Add format to the settings | ||
| if (settings == null) { | ||
| settings = new InsertSettings(); | ||
| } | ||
|
|
||
| String operationId = registerOperationMetrics(); | ||
| settings.setOperationId(operationId); | ||
| globalClientStats.get(operationId).start(ClientMetrics.OP_DURATION); | ||
| globalClientStats.get(operationId).start(ClientMetrics.OP_SERIALIZATION); | ||
|
|
||
| //Add format to the settings | ||
| if (settings == null) { | ||
| settings = new InsertSettings(); | ||
| } | ||
|
|
||
| boolean hasDefaults = this.tableSchemaHasDefaults.get(tableName); | ||
| ClickHouseFormat format = hasDefaults? ClickHouseFormat.RowBinaryWithDefaults : ClickHouseFormat.RowBinary; | ||
|
|
@@ -1193,11 +1194,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data, | |
| } | ||
|
|
||
|
|
||
| String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); | ||
| final int maxRetries = retry == null ? 0 : Integer.parseInt(retry); | ||
| Integer retry = (Integer) configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); | ||
| final int maxRetries = retry == null ? 0 : retry; | ||
|
|
||
| settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name()); | ||
| final InsertSettings finalSettings = settings; | ||
| final InsertSettings finalSettings = new InsertSettings(buildRequestSettings(settings.getAllSettings())); | ||
| Supplier<InsertResponse> supplier = () -> { | ||
| long startTime = System.nanoTime(); | ||
| // Selecting some node | ||
|
|
@@ -1319,8 +1320,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, | |
| InsertSettings settings) { | ||
|
|
||
| final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ? | ||
| Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), | ||
| ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getDefaultValue())) : | ||
| (int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) : | ||
| settings.getInputStreamCopyBufferSize(); | ||
|
|
||
| if (writeBufferSize <= 0) { | ||
|
|
@@ -1392,17 +1392,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, | |
|
|
||
| Supplier<InsertResponse> responseSupplier; | ||
|
|
||
|
|
||
| final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ? | ||
| Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "8192")) : | ||
| (int) configuration.get(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey()) : | ||
| settings.getInputStreamCopyBufferSize(); | ||
|
|
||
| if (writeBufferSize <= 0) { | ||
| throw new IllegalArgumentException("Buffer size must be greater than 0"); | ||
| } | ||
|
|
||
| settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name()); | ||
| final InsertSettings finalSettings = settings; | ||
| final InsertSettings finalSettings = new InsertSettings(buildRequestSettings(settings.getAllSettings())); | ||
|
|
||
| StringBuilder sqlStmt = new StringBuilder("INSERT INTO ").append(tableName); | ||
| if (columnNames != null && !columnNames.isEmpty()) { | ||
|
|
@@ -1532,14 +1531,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec | |
| } | ||
| ClientStatisticsHolder clientStats = new ClientStatisticsHolder(); | ||
| clientStats.start(ClientMetrics.OP_DURATION); | ||
| applyDefaults(settings); | ||
|
|
||
| Supplier<QueryResponse> responseSupplier; | ||
|
|
||
| if (queryParams != null) { | ||
| settings.setOption("statement_params", queryParams); | ||
| } | ||
| final QuerySettings finalSettings = settings; | ||
| final QuerySettings finalSettings = new QuerySettings(buildRequestSettings(settings.getAllSettings())); | ||
| responseSupplier = () -> { | ||
| long startTime = System.nanoTime(); | ||
| // Selecting some node | ||
|
|
@@ -1917,7 +1915,7 @@ public CompletableFuture<CommandResponse> execute(String sql) { | |
| public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) { | ||
| ClickHouseBinaryFormatReader reader = null; | ||
| // Using caching buffer allocator is risky so this parameter is not exposed to the user | ||
| boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers"); | ||
| boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers", false); | ||
| BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ? | ||
| new BinaryStreamReader.CachingByteBufferAllocator() : | ||
| new BinaryStreamReader.DefaultByteBufferAllocator(); | ||
|
|
@@ -1955,25 +1953,6 @@ private String registerOperationMetrics() { | |
| return operationId; | ||
| } | ||
|
|
||
| private void applyDefaults(QuerySettings settings) { | ||
| Map<String, Object> settingsMap = settings.getAllSettings(); | ||
|
|
||
| String key = ClientConfigProperties.USE_SERVER_TIMEZONE.getKey(); | ||
| if (!settingsMap.containsKey(key) && configuration.containsKey(key)) { | ||
| settings.setOption(key, MapUtils.getFlag(configuration, key)); | ||
| } | ||
|
|
||
| key = ClientConfigProperties.USE_TIMEZONE.getKey(); | ||
| if ( !settings.getUseServerTimeZone() && !settingsMap.containsKey(key) && configuration.containsKey(key)) { | ||
| settings.setOption(key, TimeZone.getTimeZone(configuration.get(key))); | ||
| } | ||
|
|
||
| key = ClientConfigProperties.SERVER_TIMEZONE.getKey(); | ||
| if (!settingsMap.containsKey(key) && configuration.containsKey(key)) { | ||
| settings.setOption(key, TimeZone.getTimeZone(configuration.get(key))); | ||
| } | ||
| } | ||
|
|
||
| private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) { | ||
| boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey()); | ||
| if (isAsync) { | ||
|
|
@@ -2001,7 +1980,7 @@ public Map<String, String> getConfiguration() { | |
|
|
||
| /** Returns operation timeout in seconds */ | ||
| protected int getOperationTimeout() { | ||
| return Integer.parseInt(configuration.get(ClientConfigProperties.MAX_EXECUTION_TIME.getKey())); | ||
| return ClientConfigProperties.MAX_EXECUTION_TIME.getOrDefault(configuration); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -2014,15 +1993,16 @@ public Set<String> getEndpoints() { | |
| } | ||
|
|
||
| public String getUser() { | ||
| return this.configuration.get(ClientConfigProperties.USER.getKey()); | ||
| return (String) this.configuration.get(ClientConfigProperties.USER.getKey()); | ||
| } | ||
|
|
||
| public String getServerVersion() { | ||
| return this.serverVersion; | ||
| } | ||
|
|
||
| public String getServerTimeZone() { | ||
| return this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey()); | ||
| TimeZone tz = (TimeZone) this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey()); | ||
| return tz == null ? null : tz.getID(); | ||
| } | ||
|
|
||
| public String getClientVersion() { | ||
|
|
@@ -2035,10 +2015,9 @@ public String getClientVersion() { | |
| * @param dbRoles | ||
| */ | ||
| public void setDBRoles(Collection<String> dbRoles) { | ||
| this.configuration.put(ClientConfigProperties.SESSION_DB_ROLES.getKey(), ClientConfigProperties.commaSeparated(dbRoles)); | ||
| this.unmodifiableDbRolesView = | ||
| Collections.unmodifiableCollection(ClientConfigProperties.valuesFromCommaSeparated( | ||
| this.configuration.get(ClientConfigProperties.SESSION_DB_ROLES.getKey()))); | ||
| List<String> tmp = new ArrayList<>(dbRoles); | ||
| this.configuration.put(ClientConfigProperties.SESSION_DB_ROLES.getKey(), tmp); | ||
| this.unmodifiableDbRolesView = ImmutableList.copyOf(tmp); | ||
| } | ||
|
|
||
| public void updateClientName(String name) { | ||
|
|
@@ -2069,4 +2048,17 @@ private Endpoint getNextAliveNode() { | |
| } | ||
|
|
||
| public static final String VALUES_LIST_DELIMITER = ","; | ||
|
|
||
| /** | ||
| * Produces a merge of operation and client settings. | ||
| * Operation settings override client settings | ||
| * @param opSettings - operation settings | ||
| * @return request settings - merged client and operation settings | ||
| */ | ||
| private Map<String, Object> buildRequestSettings(Map<String, Object> opSettings) { | ||
| Map<String, Object> requestSettings = new HashMap<>(); | ||
| requestSettings.putAll(configuration); | ||
| requestSettings.putAll(opSettings); | ||
| return requestSettings; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.