Skip to content

Add Avro/AvroConfluent formats#8571

Merged
alexey-milovidov merged 11 commits intoClickHouse:masterfrom
oandrew:avro
Jan 24, 2020
Merged

Add Avro/AvroConfluent formats#8571
alexey-milovidov merged 11 commits intoClickHouse:masterfrom
oandrew:avro

Conversation

@oandrew
Copy link
Copy Markdown
Contributor

@oandrew oandrew commented Jan 8, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • New Feature

Changelog entry (up to few sentences, required except for Non-significant/Documentation categories):
Add Avro file input/output formats
Add AvroConfluent input format (for Kafka)

Detailed description (optional):
This adds support for Avro format input/output.
Schemas are dynamically matched/generated based on the table structure.

Format Avro - input/output

Reading/writing Avro data files.
Input example:

$ cat data.avro | avro-tools getschema /dev/stdin
{
  "type" : "record",
  "name" : "test",
  "fields" : [ {
    "name" : "a",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "array",
        "items" : "string"
      }
    }
  }, {
    "name" : "b",
    "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
    "name" : "c",
    "type" : [ "null", "long" ]
  } ]
}
$ cat avro.data | avrocat 
{"a": [["a1", "a2"], [""]], "b": 1578539189956, "c": null}
{"a": [], "b": 1578539189956, "c": {"long": 1}}
$ cat data.avro |  clickhouse-local   --input-format Avro --structure 'a Array(Array(String)), b DateTime64(3), c Nullable(Int64)' -q 'select *  from table format Pretty'
┏━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┓
┃ a                  ┃                       b ┃    c ┃
┡━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━┩
│ [['a1','a2'],['']] │ 2020-01-08 21:06:29.956 │ ᴺᵁᴸᴸ │
├────────────────────┼─────────────────────────┼──────┤
│ []                 │ 2020-01-08 21:06:29.956 │    1 │
└────────────────────┴─────────────────────────┴──────┘

Output example:

$ clickhouse-local -q "select toNullable('str') as a, toInt64(123) as b, now64() as c  format Avro" | avro-tools getschema /dev/stdin
{
  "type" : "record",
  "name" : "row",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "string" ]
  }, {
    "name" : "b",
    "type" : "long"
  }, {
    "name" : "c",
    "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  } ]
}
$ clickhouse-local -q "select toNullable('str') as a, toInt64(123) as b, now64() as c  format Avro" | avrocat
{"a": {"string": "str"}, "b": 123, "c": 1578539557810}

Format AvroConfluent (experimental) - input

Support Confluent Platform Avro single-object framing format. It's often used with Kafka. Each Avro datum embeds a schema ID which is resolved by looking it up in Schema Registry. This allows deserializing messages without knowing the schema in advance.
input_format_avro_schema_registry_url setting allows to specify the Schema Registry URL.

Example:

CREATE TABLE data (`a` String, `b` Int64, `c` Nullable(Float64)) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka-broker:9092', kafka_topic_list = 'topic', kafka_group_name = 'group', kafka_format = 'AvroConfluent', kafka_num_consumers = 1, input_format_avro_schema_registry_url = 'http://platform-schema-registry'

Notes:

  • AvroConfluent only caches schemas per an instance of an InputFormat. This means SchemaRegistry will be queried each time a batch from Kafka is processed.

This PR also depends on some changes in contrib/boost (adding iostreams) so for the purpose of getting some feedback I temporarily pointed it to my fork.

Date Types matching:

Avro data type (INSERT) ClickHouse data type Avro data type (SELECT)
BOOL UInt8 BOOL
INT Int32 INT
LONG Int64 LONG
FLOAT Float32 FLOAT
DOUBLE Float64 DOUBLE
STRING, BYTES, ENUM String STRING
FIXED(N) FixedString(N) FIXED(N)
ARRAY(T) Array(T) ARRAY(T)
UNION(NULL, T) Nullable(T) UNION(NULL, T)
ENUM Enum8,Enum16 ENUM
INT(DATE) Date INT(DATE)
LONG(TIMESTAMP_MILLIS) DateTime64(3) LONG(TIMESTAMP_MILLIS)
LONG(TIMESTAMP_MICROS) DateTime64(6) LONG(TIMESTAMP_MICROS)

Add Avro file input/output formats
Add AvroConfluent input format (for Kafka)
@oandrew oandrew requested a review from a team January 8, 2020 09:44
@ghost ghost requested review from millb and removed request for a team January 8, 2020 09:44
@filimonov
Copy link
Copy Markdown
Contributor

test?

@oandrew oandrew mentioned this pull request Jan 9, 2020
@oandrew
Copy link
Copy Markdown
Contributor Author

oandrew commented Jan 9, 2020

Updated PR with more information and added some usage examples.
I'll be adding tests soon.

@filimonov
Copy link
Copy Markdown
Contributor

Nice 'Date Types matching:' table. Enum in input listed twice.

@oandrew
Copy link
Copy Markdown
Contributor Author

oandrew commented Jan 10, 2020

Nice 'Date Types matching:' table. Enum in input listed twice.

Yep, that's intentional - Avro enum can be inserted into either String or Enum in Clickhouse.

@oandrew oandrew requested a review from filimonov January 11, 2020 07:26
@oandrew
Copy link
Copy Markdown
Contributor Author

oandrew commented Jan 11, 2020

@alexey-milovidov could you take a look at this when you have a chance? Thanks

@alexey-milovidov alexey-milovidov requested review from alexey-milovidov and removed request for millb January 11, 2020 11:09
@alexey-milovidov
Copy link
Copy Markdown
Member

Yes, give me a few days...

@filimonov
Copy link
Copy Markdown
Contributor

AvroConfluent only caches schemas per an instance of an InputFormat. This means SchemaRegistry will be queried each time a batch from Kafka is processed.

That sounds suboptimal. Especially for Kafka - it would be stupid to refetch schema for each block of data. Did you consider storing cached schemas somewhere else?

@alexey-milovidov alexey-milovidov self-assigned this Jan 18, 2020
@alexey-milovidov
Copy link
Copy Markdown
Member

Doing HTTP request when parsing format is very strange and impose vulnerabilities.
Do you remember XXE vulnerabilities in XML parsers?
https://owasp.org/www-community/vulnerabilities/XML_External_Entity_(XXE)_Processing

I will check how we can ensure that RemoteHostFilter is available and used.

@alexey-milovidov
Copy link
Copy Markdown
Member

alexey-milovidov commented Jan 18, 2020

And we don't setup any timeouts for HTTP request...
You know - any request without timeout is anti-pattern.

@alexey-milovidov alexey-milovidov merged commit 8346fb5 into ClickHouse:master Jan 24, 2020
@tavplubix tavplubix added the pr-feature Pull request with new product feature label Feb 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants