Skip to content

Commit 2999bb8

Browse files
Backport #78694 to 25.6: Optimize replica-initiator communication in distributed queries
1 parent 0a44d40 commit 2999bb8

File tree

48 files changed

+1328
-185
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1328
-185
lines changed

src/Client/Connection.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1422,7 +1422,8 @@ void Connection::initBlockInput()
14221422
if (!maybe_compressed_in)
14231423
{
14241424
if (compression == Protocol::Compression::Enable)
1425-
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
1425+
// Different codecs in this case are the default (e.g. LZ4) and codec NONE to skip compression in case of ColumnBLOB.
1426+
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /*allow_different_codec=*/true);
14261427
else
14271428
maybe_compressed_in = in;
14281429
}

src/Columns/ColumnBLOB.h

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
#pragma once
2+
3+
#include <Columns/IColumn.h>
4+
#include <Compression/CompressedReadBuffer.h>
5+
#include <Compression/CompressedWriteBuffer.h>
6+
#include <Compression/ICompressionCodec.h>
7+
#include <Core/ColumnWithTypeAndName.h>
8+
#include <Core/Field.h>
9+
#include <DataTypes/ObjectUtils.h>
10+
#include <DataTypes/Serializations/ISerialization.h>
11+
#include <Formats/NativeReader.h>
12+
#include <Formats/NativeWriter.h>
13+
#include <IO/BufferWithOwnMemory.h>
14+
#include <IO/ReadBufferFromMemory.h>
15+
#include <IO/WriteBufferFromVector.h>
16+
#include <Interpreters/castColumn.h>
17+
#include <base/defines.h>
18+
#include <Common/PODArray.h>
19+
#include <Common/WeakHash.h>
20+
21+
namespace DB
22+
{
23+
24+
namespace ErrorCodes
25+
{
26+
extern const int LOGICAL_ERROR;
27+
}
28+
29+
/// ColumnBLOB is a special column type that stores a serialized and compressed version of another column.
30+
/// Most of the `IColumn` methods are not applicable to it, thus they throw exceptions.
31+
/// Used to offload the (de)serialization and (de)compression of columns to the pipeline threads instead of TCPHandler and remote connection threads.
32+
/// Methods `toBLOB` and `fromBLOB` are used to convert between the original column and the BLOB representation.
33+
/// See `MarshallBlocksTransform`, `UnmarshallBlocksTransform`, and `SerializationDetached`.
34+
class ColumnBLOB : public COWHelper<IColumnHelper<ColumnBLOB>, ColumnBLOB>
35+
{
36+
public:
37+
using BLOB = PODArray<char>;
38+
39+
private:
40+
friend class COWHelper<IColumnHelper<ColumnBLOB>, ColumnBLOB>;
41+
42+
// The argument is supposed to be some ColumnBLOB's internal BLOB,
43+
// the return value is the reconstructed column.
44+
using FromBLOB = std::function<ColumnPtr(const BLOB &)>;
45+
46+
ColumnBLOB(
47+
ColumnWithTypeAndName wrapped_column_, CompressionCodecPtr codec, UInt64 client_revision, const FormatSettings & format_settings)
48+
: rows(wrapped_column_.column->size())
49+
, wrapped_column(wrapped_column_.column)
50+
{
51+
chassert(wrapped_column);
52+
toBLOB(blob, wrapped_column_, codec, client_revision, format_settings);
53+
}
54+
55+
ColumnBLOB(FromBLOB task, ColumnPtr wrapped_column_, size_t rows_)
56+
: rows(rows_)
57+
, wrapped_column(std::move(wrapped_column_))
58+
, from_blob_task(std::move(task))
59+
{
60+
chassert(wrapped_column);
61+
}
62+
63+
// Only needed to make compiler happy.
64+
[[noreturn]] ColumnBLOB(const ColumnBLOB & other)
65+
: COWHelper(other)
66+
, blob(other.blob.begin(), other.blob.end())
67+
, rows(other.rows)
68+
, wrapped_column(other.wrapped_column)
69+
, from_blob_task(other.from_blob_task)
70+
{
71+
throw Exception(ErrorCodes::LOGICAL_ERROR, "ColumnBLOB copy constructor should not be called");
72+
}
73+
74+
public:
75+
const char * getFamilyName() const override { return "BLOB"; }
76+
77+
size_t size() const override { return rows; }
78+
size_t byteSize() const override { return wrapped_column->byteSize() + blob.size(); }
79+
size_t allocatedBytes() const override { return wrapped_column->allocatedBytes() + blob.capacity(); }
80+
81+
BLOB & getBLOB() { return blob; }
82+
const BLOB & getBLOB() const { return blob; }
83+
84+
bool wrappedColumnIsSparse() const
85+
{
86+
chassert(wrapped_column);
87+
return wrapped_column->isSparse();
88+
}
89+
90+
MutableColumnPtr cloneEmpty() const override
91+
{
92+
chassert(wrapped_column);
93+
return wrapped_column->cloneEmpty();
94+
}
95+
96+
ColumnPtr convertFrom() const
97+
{
98+
chassert(from_blob_task);
99+
return from_blob_task(blob);
100+
}
101+
102+
/// Creates serialized and compressed blob from the source column.
103+
static void toBLOB(
104+
BLOB & blob,
105+
ColumnWithTypeAndName wrapped_column,
106+
CompressionCodecPtr codec,
107+
UInt64 client_revision,
108+
const std::optional<FormatSettings> & format_settings)
109+
{
110+
WriteBufferFromVector<BLOB> wbuf(blob);
111+
CompressedWriteBuffer compressed_buffer(wbuf, codec);
112+
auto serialization = NativeWriter::getSerialization(client_revision, wrapped_column);
113+
NativeWriter::writeData(
114+
*serialization, wrapped_column.column, compressed_buffer, format_settings, 0, wrapped_column.column->size(), client_revision);
115+
compressed_buffer.finalize();
116+
}
117+
118+
/// Decompresses and deserializes the blob into the source column.
119+
static ColumnPtr fromBLOB(
120+
const BLOB & blob,
121+
ColumnPtr nested,
122+
SerializationPtr nested_serialization,
123+
size_t rows,
124+
const FormatSettings * format_settings,
125+
double avg_value_size_hint)
126+
{
127+
ReadBufferFromMemory rbuf(blob.data(), blob.size());
128+
CompressedReadBuffer decompressed_buffer(rbuf);
129+
chassert(nested->empty());
130+
NativeReader::readData(*nested_serialization, nested, decompressed_buffer, format_settings, rows, avg_value_size_hint);
131+
return nested;
132+
}
133+
134+
void addCast(DataTypePtr from, DataTypePtr to)
135+
{
136+
chassert(from_blob_task);
137+
from_blob_task = [from_task = std::move(from_blob_task), from, to, this](const BLOB &)
138+
{
139+
ColumnWithTypeAndName col;
140+
col.column = from_task(blob);
141+
col.type = from;
142+
return castColumn(col, to);
143+
};
144+
}
145+
146+
/// All other methods throw the exception.
147+
148+
TypeIndex getDataType() const override { throwInapplicable(); }
149+
Field operator[](size_t) const override { throwInapplicable(); }
150+
void get(size_t, Field &) const override { throwInapplicable(); }
151+
std::pair<String, DataTypePtr> getValueNameAndType(size_t) const override { throwInapplicable(); }
152+
StringRef getDataAt(size_t) const override { throwInapplicable(); }
153+
bool isDefaultAt(size_t) const override { throwInapplicable(); }
154+
void insert(const Field &) override { throwInapplicable(); }
155+
bool tryInsert(const Field &) override { throwInapplicable(); }
156+
#if !defined(DEBUG_OR_SANITIZER_BUILD)
157+
void insertRangeFrom(const IColumn &, size_t, size_t) override { throwInapplicable(); }
158+
#else
159+
void doInsertRangeFrom(const IColumn &, size_t, size_t) override { throwInapplicable(); }
160+
#endif
161+
void insertData(const char *, size_t) override { throwInapplicable(); }
162+
void insertDefault() override { throwInapplicable(); }
163+
void popBack(size_t) override { throwInapplicable(); }
164+
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwInapplicable(); }
165+
char * serializeValueIntoMemory(size_t, char *) const override { throwInapplicable(); }
166+
const char * deserializeAndInsertFromArena(const char *) override { throwInapplicable(); }
167+
const char * skipSerializedInArena(const char *) const override { throwInapplicable(); }
168+
void updateHashWithValue(size_t, SipHash &) const override { throwInapplicable(); }
169+
WeakHash32 getWeakHash32() const override { throwInapplicable(); }
170+
void updateHashFast(SipHash &) const override { throwInapplicable(); }
171+
ColumnPtr filter(const Filter &, ssize_t) const override { throwInapplicable(); }
172+
void expand(const Filter &, bool) override { throwInapplicable(); }
173+
ColumnPtr permute(const Permutation &, size_t) const override { throwInapplicable(); }
174+
ColumnPtr index(const IColumn &, size_t) const override { throwInapplicable(); }
175+
#if !defined(DEBUG_OR_SANITIZER_BUILD)
176+
int compareAt(size_t, size_t, const IColumn &, int) const override { throwInapplicable(); }
177+
#else
178+
int doCompareAt(size_t, size_t, const IColumn &, int) const override { throwInapplicable(); }
179+
#endif
180+
void compareColumn(const IColumn &, size_t, PaddedPODArray<UInt64> *, PaddedPODArray<Int8> &, int, int) const override
181+
{
182+
throwInapplicable();
183+
}
184+
bool hasEqualValues() const override { throwInapplicable(); }
185+
void getPermutation(IColumn::PermutationSortDirection, IColumn::PermutationSortStability, size_t, int, Permutation &) const override
186+
{
187+
throwInapplicable();
188+
}
189+
void updatePermutation(
190+
IColumn::PermutationSortDirection, IColumn::PermutationSortStability, size_t, int, Permutation &, EqualRanges &) const override
191+
{
192+
throwInapplicable();
193+
}
194+
ColumnPtr replicate(const Offsets &) const override { throwInapplicable(); }
195+
MutableColumns scatter(ColumnIndex, const Selector &) const override { throwInapplicable(); }
196+
void gather(ColumnGathererStream &) override { throwInapplicable(); }
197+
void getExtremes(Field &, Field &) const override { throwInapplicable(); }
198+
size_t byteSizeAt(size_t) const override { throwInapplicable(); }
199+
double getRatioOfDefaultRows(double) const override { throwInapplicable(); }
200+
UInt64 getNumberOfDefaultRows() const override { throwInapplicable(); }
201+
void getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const override { throwInapplicable(); }
202+
203+
bool hasDynamicStructure() const override { throwInapplicable(); }
204+
void takeDynamicStructureFromSourceColumns(const Columns &) override { throwInapplicable(); }
205+
206+
private:
207+
/// Compressed and serialized representation of the wrapped column.
208+
BLOB blob;
209+
210+
/// Always set
211+
const size_t rows;
212+
ColumnPtr wrapped_column;
213+
214+
/// Set only in cast of "from" conversion
215+
FromBLOB from_blob_task;
216+
217+
[[noreturn]] void throwInapplicable() const
218+
{
219+
throw Exception(ErrorCodes::LOGICAL_ERROR, "ColumnBLOB should be converted to a regular column before usage");
220+
}
221+
};
222+
223+
[[nodiscard]] inline Block convertBLOBColumns(const Block & block)
224+
{
225+
Block res;
226+
res.info = block.info;
227+
for (const auto & elem : block)
228+
{
229+
ColumnWithTypeAndName column = elem;
230+
if (const auto * col = typeid_cast<const ColumnBLOB *>(column.column.get()))
231+
column.column = col->convertFrom();
232+
res.insert(std::move(column));
233+
}
234+
return res;
235+
}
236+
}

src/Columns/IColumn.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Columns/ColumnAggregateFunction.h>
44
#include <Columns/ColumnArray.h>
5+
#include <Columns/ColumnBLOB.h>
56
#include <Columns/ColumnCompressed.h>
67
#include <Columns/ColumnConst.h>
78
#include <Columns/ColumnDecimal.h>
@@ -669,6 +670,7 @@ template class IColumnHelper<ColumnObject, IColumn>;
669670

670671
template class IColumnHelper<IColumnDummy, IColumn>;
671672

673+
template class IColumnHelper<ColumnBLOB, IColumn>;
672674

673675
void intrusive_ptr_add_ref(const IColumn * c)
674676
{
@@ -681,4 +683,5 @@ void intrusive_ptr_release(const IColumn * c)
681683
BOOST_ASSERT(c != nullptr);
682684
boost::sp_adl_block::intrusive_ptr_release(dynamic_cast<const boost::intrusive_ref_counter<IColumn> *>(c));
683685
}
686+
684687
}

src/Compression/CompressedWriteBuffer.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
namespace DB
1414
{
1515

16+
namespace ErrorCodes
17+
{
18+
extern const int LOGICAL_ERROR;
19+
}
20+
1621
void CompressedWriteBuffer::nextImpl()
1722
{
1823
if (!offset())
@@ -89,4 +94,14 @@ void CompressedWriteBuffer::cancelImpl() noexcept
8994
out.cancel();
9095
}
9196

97+
void CompressedWriteBuffer::setCodec(CompressionCodecPtr codec_)
98+
{
99+
// Flush all the pending data that was supposed to be compressed with the old codec.
100+
next();
101+
if (offset() != 0)
102+
throw Exception(ErrorCodes::LOGICAL_ERROR, "CompressedWriteBuffer: offset() is not zero");
103+
104+
chassert(codec_);
105+
codec = std::move(codec_);
106+
}
92107
}

src/Compression/CompressedWriteBuffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
4040
return offset();
4141
}
4242

43+
CompressionCodecPtr getCodec() const { return codec; }
44+
45+
void setCodec(CompressionCodecPtr codec_);
46+
4347
private:
4448
void nextImpl() override;
4549
/// finalize call does not affect the out buffer.

src/Core/ProtocolDefines.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,14 @@ static constexpr auto DBMS_MIN_REVISON_WITH_JWT_IN_INTERSERVER = 54476;
106106

107107
static constexpr auto DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION = 54477;
108108

109+
static constexpr auto DBMS_MIN_REVISON_WITH_PARALLEL_BLOCK_MARSHALLING = 54478;
110+
109111
/// Version of ClickHouse TCP protocol.
110112
///
111113
/// Should be incremented manually on protocol changes.
112114
///
113115
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
114116
/// later is just a number for server version (one number instead of commit SHA)
115117
/// for simplicity (sometimes it may be more convenient in some use cases).
116-
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54477;
117-
118+
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54478;
118119
}

src/Core/Settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6631,6 +6631,7 @@ When the query prioritization mechanism is employed (see setting `priority`), lo
66316631
)", BETA) \
66326632
DECLARE(Float, min_os_cpu_wait_time_ratio_to_throw, 0.0, "Min ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 0 at this point.", 0) \
66336633
DECLARE(Float, max_os_cpu_wait_time_ratio_to_throw, 0.0, "Max ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 1 at this point.", 0) \
6634+
DECLARE(Bool, enable_parallel_blocks_marshalling, true, "Affects only distributed queries. If enabled, blocks will be (de)serialized and (de)compressed on pipeline threads (i.e. with higher parallelism that what we have by default) before/after sending to the initiator.", 0) \
66346635
DECLARE(UInt64, min_outstreams_per_resize_after_split, 24, R"(
66356636
Specifies the minimum number of output streams of a `Resize` or `StrictResize` processor after the split is performed during pipeline generation. If the resulting number of streams is less than this value, the split operation will not occur.
66366637

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
8282
{"use_skip_indexes_if_final_exact_mode", 0, 1, "Change in default value of setting"},
8383
{"allow_experimental_time_series_aggregate_functions", false, false, "New setting to enable experimental timeSeries* aggregate functions."},
8484
{"min_outstreams_per_resize_after_split", 0, 24, "New setting."},
85+
{"enable_parallel_blocks_marshalling", "false", "true", "A new setting"},
86+
8587
});
8688
addSettingsChanges(settings_changes_history, "25.5",
8789
{

src/DataTypes/IDataType.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <DataTypes/Serializations/SerializationSparse.h>
1616
#include <DataTypes/Serializations/SerializationInfo.h>
1717

18+
#include <DataTypes/Serializations/SerializationDetached.h>
1819

1920
namespace DB
2021
{
@@ -63,7 +64,7 @@ void IDataType::updateAvgValueSizeHint(const IColumn & column, double & avg_valu
6364
MutableColumnPtr IDataType::createColumn(const ISerialization & serialization) const
6465
{
6566
auto column = createColumn();
66-
if (serialization.getKind() == ISerialization::Kind::SPARSE)
67+
if (serialization.getKind() == ISerialization::Kind::SPARSE || serialization.getKind() == ISerialization::Kind::DETACHED_OVER_SPARSE)
6768
return ColumnSparse::create(std::move(column));
6869

6970
return column;
@@ -292,6 +293,13 @@ SerializationPtr IDataType::getSerialization(ISerialization::Kind kind) const
292293
if (supportsSparseSerialization() && kind == ISerialization::Kind::SPARSE)
293294
return getSparseSerialization();
294295

296+
if (kind == ISerialization::Kind::DETACHED)
297+
return std::make_shared<SerializationDetached>(getDefaultSerialization());
298+
299+
if (kind == ISerialization::Kind::DETACHED_OVER_SPARSE)
300+
return std::make_shared<SerializationDetached>(
301+
supportsSparseSerialization() ? getSparseSerialization() : getDefaultSerialization());
302+
295303
return getDefaultSerialization();
296304
}
297305

0 commit comments

Comments
 (0)