Skip to content

Streaming Data Import From S3 #37012

@alexey-milovidov

Description

@alexey-milovidov

(you don't have to strictly follow this form)

Implement a table engine to subscribe to a bunch of files in a directory in s3 or similar.
This engine can be used similarly to Kafka, RabbitMQ or FileLog.

Requirements

Allow to specify a path in s3 and a pattern for files with wildcards, similar to ordinary s3 engine.

Distributed consumption. The set of currently processing, already processed and failed files are tracked in ZooKeeper.
Multiple table engines can independently take different files for processing.
Currently processing files are tracked with ephemeral nodes.
The path in ZooKeeper can be specified as a table engine setting or default path can be formed from the global configuration-provided path and table UUID. Replicated database will ensure shared table UUID on multiple replicas.

Ordered and unordered mode. With unordered mode (default), the set of all already processed files is tracked with persistent nodes in ZooKepeer (to avoid processing the same files again). The old items are removed by max set size and max age, this is controlled by table engine settings.

With ordered mode, only the max name of the successfully consumed file, and the names of files that will be retried after unsuccessful loading attempt are being stored. New files will be selected for processing if their names are greater than the max name. File names are sorted lexicographically. Note: natural sort order can be also implemented.

Delete, keep or move after processing. The engine should allow to choose between removing the file after successful loading or keeping it intact. Advanced feature can be - using server-side copy request before removing to change file name by a pattern or simply moving to a subdirectory. By default is should keep files intact.

Max retries. If a file was not loaded successfully, the loading can be retried up to specified number of times. Failed files are never removed from s3. Advanced feature can be - using server-side copy request to change failed file name by a pattern or simply moving to a subdirectory. By default, there are no retries.

Subscription to a directory is performed simply by polling every time interval with specified min timeout (say 1 second) and exponential backof to max timeout (say 10 seconds). After successful consumption, the timeout before next directory listing is reset to the min timeout.

The engine should allow to specify all the format related settings, including max_block_size, as well as s3 related settings.

Existing files at table creation can work for schema inference. Table metadata should be stored in ZooKeeper and can be reused for schema inference on creation of multiple tables consuming from the same place.

Out of scope

Recursive directory listing. Can be implemented with #36316.

Partitions. If ordered consumption is needed from multiple directories, multiple consuming tables should be created instead.

System tables for introspection.

ALTER TABLE.

Notes

Operation of this engine is somewhat similar to asynchronous operation of Distributed engine. So, the Distributed engine can be reimagined as a queue.

Examples

CREATE TABLE queue ENGINE = S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/wikistat/original/pageviews-*.gz', LineAsString)
SETTINGS mode = 'ordered', keeper_path = '/clickhouse/queues/my_queue'

Using reasonable defaults and reusing Replicated databases and schema inference, this table engine can be created without any settings and operated very easily.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions