Skip to content

Optimize replica-initiator communication in distributed queries#78694

Merged
nickitat merged 137 commits intomasterfrom
blob
Jun 16, 2025
Merged

Optimize replica-initiator communication in distributed queries#78694
nickitat merged 137 commits intomasterfrom
blob

Conversation

@nickitat
Copy link
Copy Markdown
Member

@nickitat nickitat commented Apr 4, 2025

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Introduced an option to offload (de)compression and (de)serialization of blocks into pipeline threads instead of a single thread associated with a network connection. Controlled by the setting enable_parallel_blocks_marshalling. It should speed up distributed queries that transfer significant amounts of data between the initiator and remote nodes.


A few comments on the implementation:

  1. We insert a special plan step that will convert columns into BLOBs at the end of pipeline:
    void transform(Chunk & chunk) override
    {
    const size_t num_rows = chunk.getNumRows();
    auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
    block = callback(block);
    chunk.setColumns(block.getColumns(), num_rows);
    }
  2. "Conversion" is really the same serialization and compression process that TCPHandler does normally (but in local buffer):
    static void toBLOB(
    BLOB & blob,
    ColumnWithTypeAndName wrapped_column,
    CompressionCodecPtr codec,
    UInt64 client_revision,
    const std::optional<FormatSettings> & format_settings)
    {
    WriteBufferFromVector<BLOB> wbuf(blob);
    CompressedWriteBuffer compressed_buffer(wbuf, codec);
    auto serialization = NativeWriter::getSerialization(client_revision, wrapped_column);
    NativeWriter::writeData(
    *serialization, wrapped_column.column, compressed_buffer, format_settings, 0, wrapped_column.column->size(), client_revision);
    compressed_buffer.finalize();
    }
  3. To avoid serialization and decompression for the columns inside TCPHandler, there is a new serialization type SerializationDetached. It does nothing except copies an already marshalled BLOB in/out of socket
  4. On the initiator's side we do the reverse process just after reading a block from socket:
    void UnmarshallBlocksTransform::transform(Chunk & chunk)
    {
    const auto rows = chunk.getNumRows();
    auto columns = chunk.detachColumns();
    for (auto & column : columns)
    {
    if (const auto * col = typeid_cast<const ColumnBLOB *>(column.get()))
    column = col->convertFrom();
    }
    chunk.setColumns(std::move(columns), rows);
    }

Examples:

Before:

❯ clickhouse-benchmark --enable_parallel_blocks_marshalling 0 -q "SELECT SearchPhrase, COUNT(*) AS c FROM hits_s3 WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10 SETTINGS enable_parallel_replicas = 1, parallel_replicas_local_plan = 1, parallel_replicas_index_analysis_only_on_coordinator = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'parallel_replicas', parallel_replicas_for_non_replicated_merge_tree = 1, parallel_replicas_min_number_of_rows_per_replica = 0" --cumulative -i 50

Queries executed: 50.

localhost:9000, queries: 50, QPS: 1.448, RPS: 144183256.898, MiB/s: 2259.247, result RPS: 14.483, result MiB/s: 0.001.

0%              0.601 sec.
10%             0.604 sec.
20%             0.617 sec.
30%             0.626 sec.
40%             0.684 sec.
50%             0.696 sec.

After:

❯ clickhouse-benchmark --enable_parallel_blocks_marshalling 1 -q "SELECT SearchPhrase, COUNT(*) AS c FROM hits_s3 WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10 SETTINGS enable_parallel_replicas = 1, parallel_replicas_local_plan = 1, parallel_replicas_index_analysis_only_on_coordinator = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'parallel_replicas', parallel_replicas_for_non_replicated_merge_tree = 1, parallel_replicas_min_number_of_rows_per_replica = 0" --cumulative -i 50

Queries executed: 50.

localhost:9000, queries: 50, QPS: 2.762, RPS: 274960427.592, MiB/s: 4308.679, result RPS: 27.622, result MiB/s: 0.001.

0%              0.322 sec.
10%             0.326 sec.
20%             0.332 sec.
30%             0.336 sec.
40%             0.348 sec.
50%             0.354 sec.
Screenshot 2025-06-16 at 12 55 20

@nickitat
Copy link
Copy Markdown
Member Author

nickitat commented Jun 10, 2025

No ideas right now about the fuzzer failures. My understanding is that it's not my bug, but the added test helps fuzzer to trigger this error now. To confirm I created another pr on top of the master with the new test as the only change. Also trying to catch it locally.
UPD: confirmed, issue exists on master.

@nickitat nickitat enabled auto-merge June 15, 2025 13:08
@nickitat
Copy link
Copy Markdown
Member Author

AST fuzzer (arm_asan) - succeeded after restart
Perf tests - #81887

@nickitat nickitat disabled auto-merge June 16, 2025 10:59
@nickitat nickitat enabled auto-merge June 16, 2025 10:59
@nickitat nickitat added this pull request to the merge queue Jun 16, 2025
Merged via the queue into master with commit 1b05596 Jun 16, 2025
441 of 479 checks passed
@nickitat nickitat deleted the blob branch June 16, 2025 15:13
@robot-ch-test-poll robot-ch-test-poll added the pr-synced-to-cloud The PR is synced to the cloud repo label Jun 16, 2025
robot-ch-test-poll4 added a commit that referenced this pull request Jun 16, 2025
Cherry pick #78694 to 25.6: Optimize replica-initiator communication in distributed queries
robot-clickhouse added a commit that referenced this pull request Jun 16, 2025
@robot-ch-test-poll robot-ch-test-poll added the pr-backports-created Backport PRs are successfully created, it won't be processed by CI script anymore label Jun 16, 2025
alexey-milovidov added a commit that referenced this pull request Jun 17, 2025
Backport #78694 to 25.6: Optimize replica-initiator communication in distributed queries
@robot-clickhouse-ci-2 robot-clickhouse-ci-2 added the pr-backports-created-cloud deprecated label, NOOP label Jun 18, 2025
@robot-clickhouse robot-clickhouse added the pr-must-backport-synced The `*-must-backport` labels are synced into the cloud Sync PR label Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-backports-created Backport PRs are successfully created, it won't be processed by CI script anymore pr-backports-created-cloud deprecated label, NOOP pr-must-backport-synced The `*-must-backport` labels are synced into the cloud Sync PR pr-performance Pull request with some performance improvements pr-synced-to-cloud The PR is synced to the cloud repo v25.6-must-backport

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants