-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Asynchronous INSERTs mode #18282
Description
Use case
Thousands of clients concurrently sending data to clickhouse-server for insertion.
Every client sending a small batch of data but the total traffic can be large.
This scenario is popular for logs and metrics.
Alternatives: clickhouse-bulk, kittenhouse.
Describe the solution you'd like
Provide a setting (0/1) to enable asynchronous inserts.
If the setting is enabled, a queue of buffers will be maintained for every distinct INSERT query (distincted by query text without data, e.g. INSERT INTO table FORMAT TSV) and user. Also we can introduce a setting to differentiate and make multiple queues for identical INSERTS (like internal sharding).
The data sent by client will be read into a buffer without parsing and put to the queue. If the queue does not exist, the INSERT query will be parsed and interpreted (syntax is checked, table is found, data streams are prepared).
A thread pool will run tasks for every queue. A task will parse data (see RowInputFormat::resetParser and its usage in StorageKafka to parse multiple small buffers) and form blocks for insertion when specified thresholds are met (see IRowInputFormat) and perform INSERT. If queue is empty for specified amount of time, it will be destroyed (so, we will not indefinitely "hold" tables, that were dropped).
Another setting (0/1) to control whether client will wait for query result or not.
If set to 1, client will setup a "future" and wait for it. When data will be inserted (possibly accumulated together with data from other connections), client will be notified about the result (success or exception). Parsing errors should be sent back to the corresponding clients and only the erroneous parts of data will be skipped. Insertion errors may happen due to the data of another clients and reported to multiple clients. CONSTRAINTs checking may be performed on per-client data but it's unnecessarily.
The asynchronous mode should have it's own thresholds to form inserted block by amount of bytes, records and time.
How it's better than Buffer tables
We will only do memcpy and a little amount of synchronization per client request.
The overhead of query parsing and interpretation is shared among multiple client requests.
Desired result
I expect up to 5 mln inserted records per second and up to 20 k opened and closed connections per second (the latter is limited by the implementation of HTTP server).