Skip to content

Commit 7896d30

Browse files
authored
Merge pull request #49122 from CurtizJ/add-async-insert-mt-setting
Add `MergeTree` setting `async_insert`
2 parents 17d6e2c + 36d53e0 commit 7896d30

File tree

7 files changed

+52
-2
lines changed

7 files changed

+52
-2
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ class IColumn;
650650
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead of read with ignore.", 0) \
651651
\
652652
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
653-
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
653+
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
654654
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
655655
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
656656
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \

src/Interpreters/executeQuery.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include <Interpreters/SelectQueryOptions.h>
5858
#include <Interpreters/TransactionLog.h>
5959
#include <Interpreters/executeQuery.h>
60+
#include <Interpreters/DatabaseCatalog.h>
6061
#include <Common/ProfileEvents.h>
6162

6263
#include <IO/CompressionMethod.h>
@@ -526,6 +527,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
526527
context->initializeExternalTablesIfSet();
527528

528529
auto * insert_query = ast->as<ASTInsertQuery>();
530+
bool async_insert_enabled = settings.async_insert;
529531

530532
/// Resolve database before trying to use async insert feature - to properly hash the query.
531533
if (insert_query)
@@ -534,6 +536,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
534536
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
535537
else if (auto table = insert_query->getTable(); !table.empty())
536538
insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table});
539+
540+
if (insert_query->table_id)
541+
if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query->table_id, context))
542+
async_insert_enabled |= table->areAsynchronousInsertsEnabled();
537543
}
538544

539545
if (insert_query && insert_query->select)
@@ -568,7 +574,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
568574
auto * queue = context->getAsynchronousInsertQueue();
569575
auto * logger = &Poco::Logger::get("executeQuery");
570576

571-
if (insert_query && settings.async_insert)
577+
if (insert_query && async_insert_enabled)
572578
{
573579
String reason;
574580

src/Storages/IStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
178178
/// Returns true if the storage is for system, which cannot be target of SHOW CREATE TABLE.
179179
virtual bool isSystemStorage() const { return false; }
180180

181+
/// Returns true if asynchronous inserts are enabled for table.
182+
virtual bool areAsynchronousInsertsEnabled() const { return false; }
181183

182184
/// Optional size information of each physical column.
183185
/// Currently it's only used by the MergeTree family for query optimizations.

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,8 @@ class MergeTreeData : public IStorage, public WithMutableContext
431431

432432
bool supportsLightweightDelete() const override;
433433

434+
bool areAsynchronousInsertsEnabled() const override { return getSettings()->async_insert; }
435+
434436
NamesAndTypesList getVirtuals() const override;
435437

436438
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const override;

src/Storages/MergeTree/MergeTreeSettings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ struct Settings;
8484
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
8585
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
8686
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
87+
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
8788
\
8889
/* Part removal settings. */ \
8990
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2
2+
2
3+
default.t_mt_async_insert 1
4+
default.t_mt_sync_insert 0
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
# shellcheck source=../shell_config.sh
5+
. "$CURDIR"/../shell_config.sh
6+
7+
${CLICKHOUSE_CLIENT} -n --query "
8+
DROP TABLE IF EXISTS t_mt_async_insert;
9+
DROP TABLE IF EXISTS t_mt_sync_insert;
10+
11+
CREATE TABLE t_mt_async_insert (id UInt64, s String)
12+
ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1;
13+
14+
CREATE TABLE t_mt_sync_insert (id UInt64, s String)
15+
ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 0;"
16+
17+
url="${CLICKHOUSE_URL}&async_insert=0&wait_for_async_insert=1"
18+
19+
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_mt_async_insert VALUES (1, 'aa'), (2, 'bb')"
20+
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_mt_sync_insert VALUES (1, 'aa'), (2, 'bb')"
21+
22+
${CLICKHOUSE_CLIENT} -n --query "
23+
SELECT count() FROM t_mt_async_insert;
24+
SELECT count() FROM t_mt_sync_insert;
25+
26+
SYSTEM FLUSH LOGS;
27+
SELECT tables[1], ProfileEvents['AsyncInsertQuery'] FROM system.query_log
28+
WHERE
29+
type = 'QueryFinish' AND
30+
current_database = currentDatabase() AND
31+
query ILIKE 'INSERT INTO t_mt_%sync_insert%'
32+
ORDER BY tables[1];
33+
34+
DROP TABLE IF EXISTS t_mt_async_insert;
35+
DROP TABLE IF EXISTS t_mt_sync_insert;"

0 commit comments

Comments
 (0)