Skip to content

RFC: Streaming Queries #42990

@alexey-milovidov

Description

@alexey-milovidov

Goals:

  • reliable consumption from Kafka and other persistent queues with exactly-once semantics;
  • consistent implementation of POPULATE for MATERIALIZED VIEW;
  • the unification of LIVE VIEW and WINDOW VIEW and allowing them to support distributed queries and reliable consumption;
  • implement distributed persistent queues inside ClickHouse, based on MergeTree tables;
  • run infinite data pipelines in the background; support alerting, triggers, and complex event processing;
  • allow aggregation and JOINing of infinite streams of data;

Continuous queries to process infinite streams of data

Continuous SELECT queries with WHERE, DISTINCT, LIMIT, and LIMIT BY will work naturally.

You should be able to write something like

SELECT title, body, actor_login FROM github_events STREAM WHERE body ILIKE '%ClickHouse%' OR title ILIKE '%ClickHouse%'

and enjoy an infinite stream of data in clickhouse-client.

Continuous INSERT SELECT queries will be able to replace MATERIALIZED VIEWs if run in the background.

INSERT INTO FUNCTION url(...) SELECT title, body, actor_login FROM github_events STREAM WHERE body ILIKE '%ClickHouse%' OR title ILIKE '%ClickHouse%'

will be able to post the updates to your Slack channel.

It should be possible to run these infinite queries in the background using some syntax like CREATE JOB.

Continuous SELECT queries with GROUP BY and ORDER BY may choose to output portions of intermediate results based on a window of time, the number of records, or values found in the data.

See the task #28855

These queries can be specified to return the updates to the complete resultset (to include all the data with the newer data) or the partial results over non-intersecting windows of data, or the changed records of the resultset produced by the newer data.

With one of these modes, we will be able to replace LIVE VIEWs as well as WINDOW VIEWs.

Continuous queries with JOIN may introduce specialized JOIN methods, like - JOINing with fixed non-updating data, JOINing with newly arrived data without recalculation of previously joined records, and JOINing with a window of data limited in size or time.

Subscription to tables as queues

Some tables may provide the possibility to read streaming updates without any persistent cursors.

For example, you can subscribe to the Null table to receive everything that is inserted there. But if there are no subscribers, the data inserted into a Null table is lost.

Some tables may allow the reader to provide a cursor as a map of offsets.

A cursor in the table is represented by a map: stream_name -> offset with an arbitrary number of streams. The offset for every stream is a monotonic UInt64 number, not necessarily sequential.

The user may request reading from the specified offset, reading from the beginning, or skip all the existing data and consume the newer data.

For example, a StripeLog table can represent a queue with a single stream, and the offset will be simply the block number.

MergeTree table can represent a queue if some modifications are made - under a setting, an autogenerated offset column can be created and used as a first column of the primary key; the background merges can still work, but be more limited on the maximal size of the part; the old data can be automatically deleted on time or size (see size-capped tables: #37548).

If there are many partitions in a MergeTree table, it can represent a queue with multiple streams - every partition represents a stream.

Distributed tables can be represented as a set of streams, which is a union of all the tables on the shards. The tables should allow distributed consumption. Similarly for Merge tables. The map of streams may change dynamically with the introduction of new streams. For every new stream, as offset was unspecified, it will consume the data from the beginning.

Replicated tables can be represented by a set of streams, where every replica represents its own set of streams (the data is partitioned by the replica where it originated). The consumption of the stream for one replica may happen on another replica but is processed in order (if a previous block of data is not replicated, the subsequent block of data cannot be consumed). Note: the map of offsets may grow if replicas are added and removed frequently.

Kafka, RabbitMQ, NATS, and FileLog tables will naturally represent a queue.

Reliable consumption of queues

SELECT queries, and INSERT SELECT specifically can use persistent storage for offsets in ClickHouse Keeper to allow pausing and resuming of consumption, and reliable resuming of consumption with exactly-once semantics after failures.

A pair of offsets (from, to) with a status of insertion can be saved to allow repeatable reading of the same blocks of data after failures.

If the destination table supports atomic INSERT with a transaction in ClickHouse Keeper (like ReplicatedMergeTree), the SELECT part can update the offsets in the same transaction to allow atomic exactly-once consumption.

It should allow using different cursors to consume the same data in the queues by different workers. It should allow concurrent and distributed consumption of the same queue using the same cursor to distribute the data evenly between workers.

When consuming data from external queues like Kafka, it should never use the offsets stored by Kafka itself, as reliable consumption is not possible with external auto-committed offsets.

The updates happening in the background in Replacing, Collapsing and similar MergeTree tables are performed independently of the subscriptions, and the consumers can read either updated or non-updated data. The same for mutations (they happen independently of consumption).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions