Skip to content

Commit fe6f313

Browse files
Backport #69080 to 24.7: fix logical error for empty async inserts
1 parent cc72e95 commit fe6f313

File tree

4 files changed

+27
-8
lines changed

4 files changed

+27
-8
lines changed

src/Interpreters/AsynchronousInsertQueue.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,8 +1004,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
10041004
size_t num_rows = executor.execute(*buffer);
10051005

10061006
total_rows += num_rows;
1007-
chunk_info->offsets.push_back(total_rows);
1008-
chunk_info->tokens.push_back(entry->async_dedup_token);
1007+
/// for some reason, client can pass zero rows and bytes to server.
1008+
/// We don't update offsets in this case, because we assume every insert has some rows during dedup
1009+
/// but we have nothing to deduplicate for this insert.
1010+
if (num_rows > 0)
1011+
{
1012+
chunk_info->offsets.push_back(total_rows);
1013+
chunk_info->tokens.push_back(entry->async_dedup_token);
1014+
}
10091015

10101016
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
10111017

@@ -1056,8 +1062,14 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
10561062
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
10571063

10581064
total_rows += block->rows();
1059-
chunk_info->offsets.push_back(total_rows);
1060-
chunk_info->tokens.push_back(entry->async_dedup_token);
1065+
/// for some reason, client can pass zero rows and bytes to server.
1066+
/// We don't update offsets in this case, because we assume every insert has some rows during dedup,
1067+
/// but we have nothing to deduplicate for this insert.
1068+
if (block->rows())
1069+
{
1070+
chunk_info->offsets.push_back(total_rows);
1071+
chunk_info->tokens.push_back(entry->async_dedup_token);
1072+
}
10611073

10621074
const auto & query_for_logging = get_query_by_format(entry->format);
10631075
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);

tests/queries/0_stateless/02481_async_insert_dedup.python

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,23 @@ def generate_data(q, total_number, use_token):
4848
partitions = ["2022-11-11 10:10:10", "2022-12-12 10:10:10"]
4949
last_number = 0
5050
while True:
51-
dup_simulate = random.randint(0, 3)
51+
# 0 to simulate duplication
52+
# 1 to simulate empty
53+
simulate_flag = random.randint(0, 4)
5254
# insert old data randomly. 25% of them are dup.
53-
if dup_simulate == 0:
55+
if simulate_flag == 0:
5456
last_idx = len(old_data) - 1
5557
if last_idx < 0:
5658
continue
5759
idx = last_idx - random.randint(0, 50)
5860
if idx < 0:
5961
idx = 0
6062
q.put(old_data[idx])
63+
if simulate_flag == 1:
64+
empty_insert_stmt = (
65+
"insert into t_async_insert_dedup values format JSONEachRow"
66+
)
67+
q.put((empty_insert_stmt, ""))
6168
else:
6269
# insert new data.
6370
chunk_size = random.randint(1, max_chunk_size)

tests/queries/0_stateless/02481_async_insert_dedup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/usr/bin/env bash
2-
# Tags: long, zookeeper, no-parallel, no-fasttest
2+
# Tags: long, zookeeper, no-fasttest
33

44
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
55
# shellcheck source=../shell_config.sh

tests/queries/0_stateless/02481_async_insert_dedup_token.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/usr/bin/env bash
2-
# Tags: long, zookeeper, no-parallel, no-fasttest
2+
# Tags: long, zookeeper, no-fasttest
33

44
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
55
# shellcheck source=../shell_config.sh

0 commit comments

Comments
 (0)