Add Avro/AvroConfluent formats#8571
Conversation
Add Avro file input/output formats Add AvroConfluent input format (for Kafka)
|
test? |
|
Updated PR with more information and added some usage examples. |
|
Nice 'Date Types matching:' table. Enum in input listed twice. |
Yep, that's intentional - Avro enum can be inserted into either String or Enum in Clickhouse. |
|
@alexey-milovidov could you take a look at this when you have a chance? Thanks |
|
Yes, give me a few days... |
That sounds suboptimal. Especially for Kafka - it would be stupid to refetch schema for each block of data. Did you consider storing cached schemas somewhere else? |
|
Doing HTTP request when parsing format is very strange and impose vulnerabilities. I will check how we can ensure that |
|
And we don't setup any timeouts for HTTP request... |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (up to few sentences, required except for Non-significant/Documentation categories):
Add Avro file input/output formats
Add AvroConfluent input format (for Kafka)
Detailed description (optional):
This adds support for Avro format input/output.
Schemas are dynamically matched/generated based on the table structure.
Format
Avro- input/outputReading/writing Avro data files.
Input example:
Output example:
Format
AvroConfluent(experimental) - inputSupport Confluent Platform Avro single-object framing format. It's often used with Kafka. Each Avro datum embeds a schema ID which is resolved by looking it up in Schema Registry. This allows deserializing messages without knowing the schema in advance.
input_format_avro_schema_registry_urlsetting allows to specify the Schema Registry URL.Example:
Notes:
AvroConfluentonly caches schemas per an instance of an InputFormat. This means SchemaRegistry will be queried each time a batch from Kafka is processed.This PR also depends on some changes in
contrib/boost(adding iostreams) so for the purpose of getting some feedback I temporarily pointed it to my fork.Date Types matching:
INSERT)SELECT)BOOLUInt8BOOLINTInt32INTLONGInt64LONGFLOATFloat32FLOATDOUBLEFloat64DOUBLESTRING,BYTES,ENUMStringSTRINGFIXED(N)FixedString(N)FIXED(N)ARRAY(T)Array(T)ARRAY(T)UNION(NULL, T)Nullable(T)UNION(NULL, T)ENUMEnum8,Enum16ENUMINT(DATE)DateINT(DATE)LONG(TIMESTAMP_MILLIS)DateTime64(3)LONG(TIMESTAMP_MILLIS)LONG(TIMESTAMP_MICROS)DateTime64(6)LONG(TIMESTAMP_MICROS)