@@ -1004,8 +1004,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
10041004 size_t num_rows = executor.execute (*buffer);
10051005
10061006 total_rows += num_rows;
1007- chunk_info->offsets .push_back (total_rows);
1008- chunk_info->tokens .push_back (entry->async_dedup_token );
1007+ // / for some reason, client can pass zero rows and bytes to server.
1008+ // / We don't update offsets in this case, because we assume every insert has some rows during dedup
1009+ // / but we have nothing to deduplicate for this insert.
1010+ if (num_rows > 0 )
1011+ {
1012+ chunk_info->offsets .push_back (total_rows);
1013+ chunk_info->tokens .push_back (entry->async_dedup_token );
1014+ }
10091015
10101016 add_to_async_insert_log (entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms );
10111017
@@ -1056,8 +1062,14 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
10561062 result_columns[i]->insertRangeFrom (*columns[i], 0 , columns[i]->size ());
10571063
10581064 total_rows += block->rows ();
1059- chunk_info->offsets .push_back (total_rows);
1060- chunk_info->tokens .push_back (entry->async_dedup_token );
1065+ // / for some reason, client can pass zero rows and bytes to server.
1066+ // / We don't update offsets in this case, because we assume every insert has some rows during dedup,
1067+ // / but we have nothing to deduplicate for this insert.
1068+ if (block->rows ())
1069+ {
1070+ chunk_info->offsets .push_back (total_rows);
1071+ chunk_info->tokens .push_back (entry->async_dedup_token );
1072+ }
10611073
10621074 const auto & query_for_logging = get_query_by_format (entry->format );
10631075 add_to_async_insert_log (entry, query_for_logging, " " , block->rows (), block->bytes (), data->timeout_ms );
0 commit comments