Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
Expand Down Expand Up @@ -36,7 +37,9 @@
import com.clickhouse.client.api.transport.Endpoint;
import com.clickhouse.client.api.transport.HttpEndpoint;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import com.google.common.collect.ImmutableList;
import net.jpountz.lz4.LZ4Factory;
Expand Down Expand Up @@ -131,6 +134,8 @@ public class Client implements AutoCloseable {

private final Map<String, Boolean> tableSchemaHasDefaults = new ConcurrentHashMap<>();

private final Map<ClickHouseDataType, Class<?>> typeHintMapping;

// Server context
private String serverVersion;
private Object metricsRegistry;
Expand Down Expand Up @@ -192,6 +197,8 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
}

this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown");

this.typeHintMapping = (Map<ClickHouseDataType, Class<?>>) this.configuration.get(ClientConfigProperties.TYPE_HINT_MAPPING.getKey());
}

/**
Expand Down Expand Up @@ -1004,6 +1011,20 @@ public Builder setServerVersion(String serverVersion) {
return this;
}

/**
* Defines mapping between ClickHouse data type and target Java type
* Used by binary readers to convert values into desired Java type.
* @param typeHintMapping - map between ClickHouse data type and Java class
* @return this builder instance
*/
public Builder typeHintMapping(Map<ClickHouseDataType, Class<?>> typeHintMapping) {
this.configuration.put(ClientConfigProperties.TYPE_HINT_MAPPING.getKey(),
ClientConfigProperties.mapToString(typeHintMapping, (v) -> {
return ((Class<?>) v).getName();
}));
return this;
}

public Client build() {
// check if endpoint are empty. so can not initiate client
if (this.endpoints.isEmpty()) {
Expand Down Expand Up @@ -1919,23 +1940,20 @@ public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
new BinaryStreamReader.CachingByteBufferAllocator() :
new BinaryStreamReader.DefaultByteBufferAllocator();

switch (response.getFormat()) {
case Native:
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
byteBufferPool, typeHintMapping);
break;
case RowBinaryWithNamesAndTypes:
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(), byteBufferPool, typeHintMapping);
break;
case RowBinaryWithNames:
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema, byteBufferPool, typeHintMapping);
break;
case RowBinary:
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
byteBufferPool, typeHintMapping);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typeHintMapping field is initialized in the constructor but there's no null check when passing it to the binary format readers. If a client is created without specifying type hints, this could potentially cause NullPointerExceptions in the binary readers that receive this parameter.

break;
default:
throw new IllegalArgumentException("Binary readers doesn't support format: " + response.getFormat());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.clickhouse.client.api;

import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -9,11 +11,14 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -168,6 +173,11 @@ public Object parseValue(String value) {

BINARY_READER_USE_PREALLOCATED_BUFFERS("client_allow_binary_reader_to_reuse_buffers", Boolean.class, "false"),

/**
* Defines mapping between ClickHouse data type and target Java type
* Used by binary readers to convert values into desired Java type.
*/
TYPE_HINT_MAPPING("type_hint_mapping", Map.class),
;

private static final Logger LOG = LoggerFactory.getLogger(ClientConfigProperties.class);
Expand Down Expand Up @@ -278,6 +288,10 @@ public Object parseValue(String value) {
return TimeZone.getTimeZone(value);
}

if (valueType.equals(Map.class)) {
return toKeyValuePairs(value);
}

return null;
}

Expand All @@ -301,7 +315,15 @@ public static Map<String, Object> parseConfigMap(Map<String, String> configMap)
for (ClientConfigProperties config : ClientConfigProperties.values()) {
String value = tmpMap.remove(config.getKey());
if (value != null) {
parsedConfig.put(config.getKey(), config.parseValue(value));
Object parsedValue;
switch (config) {
case TYPE_HINT_MAPPING:
parsedValue = translateTypeHintMapping(value);
break;
default:
parsedValue = config.parseValue(value);
}
parsedConfig.put(config.getKey(), parsedValue);
}
}

Expand All @@ -317,4 +339,90 @@ public static Map<String, Object> parseConfigMap(Map<String, String> configMap)

return parsedConfig;
}


/**
* Converts given string to key value pairs.
* This is very simple implementation that do not handle edge cases like
* {@code k1=v1, ,k2=v2}
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be great if we have a simple example of the string

* @param str string
* @return non-null key value pairs
*/
public static Map<String, String> toKeyValuePairs(String str) {
if (str == null || str.isEmpty()) {
return Collections.emptyMap();
}

Map<String, String> map = new LinkedHashMap<>();
String key = null;
StringBuilder builder = new StringBuilder();
for (int i = 0, len = str.length(); i < len; i++) {
char ch = str.charAt(i);
if (ch == '\\' && i + 1 < len) {
ch = str.charAt(++i);
builder.append(ch);
continue;
}

if (Character.isWhitespace(ch)) {
if (builder.length() > 0) {
builder.append(ch);
}
} else if (ch == '=' && key == null) {
key = builder.toString().trim();
builder.setLength(0);
} else if (ch == ',' && key != null) {
String value = builder.toString().trim();
builder.setLength(0);
if (!key.isEmpty() && !value.isEmpty()) {
map.put(key, value);
}
key = null;
} else {
builder.append(ch);
}
}

if (key != null && builder.length() > 0) {
String value = builder.toString().trim();
if (!key.isEmpty() && !value.isEmpty()) {
map.put(key, value);
}
}

return Collections.unmodifiableMap(map);
}



public static String mapToString(Map<?,?> map, Function<Object, String> valueConverter) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<?, ?> entry : map.entrySet()) {
sb.append(entry.getKey()).append("=").append(valueConverter.apply(entry.getValue())).append(",");
}

if (sb.length() > 0) {
sb.setLength(sb.length() - 1);
}
return sb.toString();
}

public static Map<ClickHouseDataType, Class<?>> translateTypeHintMapping(String mappingStr) {
if (mappingStr == null || mappingStr.isEmpty()) {
return AbstractBinaryFormatReader.NO_TYPE_HINT_MAPPING;
}

Map<String, String> mapping= ClientConfigProperties.toKeyValuePairs(mappingStr);
Map<ClickHouseDataType, Class<?>> hintMapping = new HashMap<>();
try {
for (Map.Entry<String, String> entry : mapping.entrySet()) {
hintMapping.put(ClickHouseDataType.of(entry.getKey()),
Class.forName(entry.getValue()));
}
} catch (ClassNotFoundException e) {
throw new ClientMisconfigurationException("Failed to translate type-hint mapping", e);
}
return hintMapping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -26,15 +27,21 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {
private int blockRowIndex;

public NativeFormatReader(InputStream inputStream, QuerySettings settings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, settings, null, byteBufferAllocator);
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> typeHintMapping) {
super(inputStream, settings, null, byteBufferAllocator, typeHintMapping);
try {
readBlock();
} catch (IOException e) {
throw new ClientException("Failed to read block", e);
}
}

public NativeFormatReader(InputStream inputStream, QuerySettings settings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this(inputStream, settings, byteBufferAllocator, NO_TYPE_HINT_MAPPING);
}

@Override
public boolean readRecord(Map<String, Object> record) throws IOException {
if (blockRowIndex >= currentBlock.getnRows()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -13,12 +14,22 @@

public class RowBinaryFormatReader extends AbstractBinaryFormatReader {

public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
public RowBinaryFormatReader(InputStream inputStream,
QuerySettings querySettings,
TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> typeHintMapping) {
super(inputStream, querySettings, schema, byteBufferAllocator, typeHintMapping);
readNextRecord();
}

public RowBinaryFormatReader(InputStream inputStream,
QuerySettings querySettings,
TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this(inputStream, querySettings, schema, byteBufferAllocator, NO_TYPE_HINT_MAPPING);
}

@Override
public boolean readRecord(Map<String, Object> record) throws IOException {
boolean firstColumn = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -17,12 +18,21 @@

public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {

public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, null, byteBufferAllocator);
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream,
QuerySettings querySettings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> typeHintMapping) {
super(inputStream, querySettings, null, byteBufferAllocator, typeHintMapping);
readSchema();
}

public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream,
QuerySettings querySettings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this(inputStream, querySettings, byteBufferAllocator, NO_TYPE_HINT_MAPPING);
}


private void readSchema() {
try {
List<String> names = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseDataType;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {

private List<String> columns = null;

public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
public RowBinaryWithNamesFormatReader(InputStream inputStream,
QuerySettings querySettings,
TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> typeHintMapping) {
super(inputStream, querySettings, schema, byteBufferAllocator, typeHintMapping);
int nCol = 0;
try {
nCol = BinaryStreamReader.readVarInt(input);
Expand All @@ -44,6 +49,13 @@ public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings que
readNextRecord();
}

public RowBinaryWithNamesFormatReader(InputStream inputStream,
QuerySettings querySettings,
TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this(inputStream, querySettings, schema, byteBufferAllocator, NO_TYPE_HINT_MAPPING);
}

public List<String> getColumns() {
return columns;
}
Expand Down
Loading
Loading