|
| 1 | +#pragma once |
| 2 | + |
| 3 | +#include <Columns/IColumn.h> |
| 4 | +#include <Compression/CompressedReadBuffer.h> |
| 5 | +#include <Compression/CompressedWriteBuffer.h> |
| 6 | +#include <Compression/ICompressionCodec.h> |
| 7 | +#include <Core/ColumnWithTypeAndName.h> |
| 8 | +#include <Core/Field.h> |
| 9 | +#include <DataTypes/ObjectUtils.h> |
| 10 | +#include <DataTypes/Serializations/ISerialization.h> |
| 11 | +#include <Formats/NativeReader.h> |
| 12 | +#include <Formats/NativeWriter.h> |
| 13 | +#include <IO/BufferWithOwnMemory.h> |
| 14 | +#include <IO/ReadBufferFromMemory.h> |
| 15 | +#include <IO/WriteBufferFromVector.h> |
| 16 | +#include <Interpreters/castColumn.h> |
| 17 | +#include <base/defines.h> |
| 18 | +#include <Common/PODArray.h> |
| 19 | +#include <Common/WeakHash.h> |
| 20 | + |
| 21 | +namespace DB |
| 22 | +{ |
| 23 | + |
| 24 | +namespace ErrorCodes |
| 25 | +{ |
| 26 | +extern const int LOGICAL_ERROR; |
| 27 | +} |
| 28 | + |
| 29 | +/// ColumnBLOB is a special column type that stores a serialized and compressed version of another column. |
| 30 | +/// Most of the `IColumn` methods are not applicable to it, thus they throw exceptions. |
| 31 | +/// Used to offload the (de)serialization and (de)compression of columns to the pipeline threads instead of TCPHandler and remote connection threads. |
| 32 | +/// Methods `toBLOB` and `fromBLOB` are used to convert between the original column and the BLOB representation. |
| 33 | +/// See `MarshallBlocksTransform`, `UnmarshallBlocksTransform`, and `SerializationDetached`. |
| 34 | +class ColumnBLOB : public COWHelper<IColumnHelper<ColumnBLOB>, ColumnBLOB> |
| 35 | +{ |
| 36 | +public: |
| 37 | + using BLOB = PODArray<char>; |
| 38 | + |
| 39 | +private: |
| 40 | + friend class COWHelper<IColumnHelper<ColumnBLOB>, ColumnBLOB>; |
| 41 | + |
| 42 | + // The argument is supposed to be some ColumnBLOB's internal BLOB, |
| 43 | + // the return value is the reconstructed column. |
| 44 | + using FromBLOB = std::function<ColumnPtr(const BLOB &)>; |
| 45 | + |
| 46 | + ColumnBLOB( |
| 47 | + ColumnWithTypeAndName wrapped_column_, CompressionCodecPtr codec, UInt64 client_revision, const FormatSettings & format_settings) |
| 48 | + : rows(wrapped_column_.column->size()) |
| 49 | + , wrapped_column(wrapped_column_.column) |
| 50 | + { |
| 51 | + chassert(wrapped_column); |
| 52 | + toBLOB(blob, wrapped_column_, codec, client_revision, format_settings); |
| 53 | + } |
| 54 | + |
| 55 | + ColumnBLOB(FromBLOB task, ColumnPtr wrapped_column_, size_t rows_) |
| 56 | + : rows(rows_) |
| 57 | + , wrapped_column(std::move(wrapped_column_)) |
| 58 | + , from_blob_task(std::move(task)) |
| 59 | + { |
| 60 | + chassert(wrapped_column); |
| 61 | + } |
| 62 | + |
| 63 | + // Only needed to make compiler happy. |
| 64 | + [[noreturn]] ColumnBLOB(const ColumnBLOB & other) |
| 65 | + : COWHelper(other) |
| 66 | + , blob(other.blob.begin(), other.blob.end()) |
| 67 | + , rows(other.rows) |
| 68 | + , wrapped_column(other.wrapped_column) |
| 69 | + , from_blob_task(other.from_blob_task) |
| 70 | + { |
| 71 | + throw Exception(ErrorCodes::LOGICAL_ERROR, "ColumnBLOB copy constructor should not be called"); |
| 72 | + } |
| 73 | + |
| 74 | +public: |
| 75 | + const char * getFamilyName() const override { return "BLOB"; } |
| 76 | + |
| 77 | + size_t size() const override { return rows; } |
| 78 | + size_t byteSize() const override { return wrapped_column->byteSize() + blob.size(); } |
| 79 | + size_t allocatedBytes() const override { return wrapped_column->allocatedBytes() + blob.capacity(); } |
| 80 | + |
| 81 | + BLOB & getBLOB() { return blob; } |
| 82 | + const BLOB & getBLOB() const { return blob; } |
| 83 | + |
| 84 | + bool wrappedColumnIsSparse() const |
| 85 | + { |
| 86 | + chassert(wrapped_column); |
| 87 | + return wrapped_column->isSparse(); |
| 88 | + } |
| 89 | + |
| 90 | + MutableColumnPtr cloneEmpty() const override |
| 91 | + { |
| 92 | + chassert(wrapped_column); |
| 93 | + return wrapped_column->cloneEmpty(); |
| 94 | + } |
| 95 | + |
| 96 | + ColumnPtr convertFrom() const |
| 97 | + { |
| 98 | + chassert(from_blob_task); |
| 99 | + return from_blob_task(blob); |
| 100 | + } |
| 101 | + |
| 102 | + /// Creates serialized and compressed blob from the source column. |
| 103 | + static void toBLOB( |
| 104 | + BLOB & blob, |
| 105 | + ColumnWithTypeAndName wrapped_column, |
| 106 | + CompressionCodecPtr codec, |
| 107 | + UInt64 client_revision, |
| 108 | + const std::optional<FormatSettings> & format_settings) |
| 109 | + { |
| 110 | + WriteBufferFromVector<BLOB> wbuf(blob); |
| 111 | + CompressedWriteBuffer compressed_buffer(wbuf, codec); |
| 112 | + auto serialization = NativeWriter::getSerialization(client_revision, wrapped_column); |
| 113 | + NativeWriter::writeData( |
| 114 | + *serialization, wrapped_column.column, compressed_buffer, format_settings, 0, wrapped_column.column->size(), client_revision); |
| 115 | + compressed_buffer.finalize(); |
| 116 | + } |
| 117 | + |
| 118 | + /// Decompresses and deserializes the blob into the source column. |
| 119 | + static ColumnPtr fromBLOB( |
| 120 | + const BLOB & blob, |
| 121 | + ColumnPtr nested, |
| 122 | + SerializationPtr nested_serialization, |
| 123 | + size_t rows, |
| 124 | + const FormatSettings * format_settings, |
| 125 | + double avg_value_size_hint) |
| 126 | + { |
| 127 | + ReadBufferFromMemory rbuf(blob.data(), blob.size()); |
| 128 | + CompressedReadBuffer decompressed_buffer(rbuf); |
| 129 | + chassert(nested->empty()); |
| 130 | + NativeReader::readData(*nested_serialization, nested, decompressed_buffer, format_settings, rows, avg_value_size_hint); |
| 131 | + return nested; |
| 132 | + } |
| 133 | + |
| 134 | + void addCast(DataTypePtr from, DataTypePtr to) |
| 135 | + { |
| 136 | + chassert(from_blob_task); |
| 137 | + from_blob_task = [from_task = std::move(from_blob_task), from, to, this](const BLOB &) |
| 138 | + { |
| 139 | + ColumnWithTypeAndName col; |
| 140 | + col.column = from_task(blob); |
| 141 | + col.type = from; |
| 142 | + return castColumn(col, to); |
| 143 | + }; |
| 144 | + } |
| 145 | + |
| 146 | + /// All other methods throw the exception. |
| 147 | + |
| 148 | + TypeIndex getDataType() const override { throwInapplicable(); } |
| 149 | + Field operator[](size_t) const override { throwInapplicable(); } |
| 150 | + void get(size_t, Field &) const override { throwInapplicable(); } |
| 151 | + std::pair<String, DataTypePtr> getValueNameAndType(size_t) const override { throwInapplicable(); } |
| 152 | + StringRef getDataAt(size_t) const override { throwInapplicable(); } |
| 153 | + bool isDefaultAt(size_t) const override { throwInapplicable(); } |
| 154 | + void insert(const Field &) override { throwInapplicable(); } |
| 155 | + bool tryInsert(const Field &) override { throwInapplicable(); } |
| 156 | +#if !defined(DEBUG_OR_SANITIZER_BUILD) |
| 157 | + void insertRangeFrom(const IColumn &, size_t, size_t) override { throwInapplicable(); } |
| 158 | +#else |
| 159 | + void doInsertRangeFrom(const IColumn &, size_t, size_t) override { throwInapplicable(); } |
| 160 | +#endif |
| 161 | + void insertData(const char *, size_t) override { throwInapplicable(); } |
| 162 | + void insertDefault() override { throwInapplicable(); } |
| 163 | + void popBack(size_t) override { throwInapplicable(); } |
| 164 | + StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwInapplicable(); } |
| 165 | + char * serializeValueIntoMemory(size_t, char *) const override { throwInapplicable(); } |
| 166 | + const char * deserializeAndInsertFromArena(const char *) override { throwInapplicable(); } |
| 167 | + const char * skipSerializedInArena(const char *) const override { throwInapplicable(); } |
| 168 | + void updateHashWithValue(size_t, SipHash &) const override { throwInapplicable(); } |
| 169 | + WeakHash32 getWeakHash32() const override { throwInapplicable(); } |
| 170 | + void updateHashFast(SipHash &) const override { throwInapplicable(); } |
| 171 | + ColumnPtr filter(const Filter &, ssize_t) const override { throwInapplicable(); } |
| 172 | + void expand(const Filter &, bool) override { throwInapplicable(); } |
| 173 | + ColumnPtr permute(const Permutation &, size_t) const override { throwInapplicable(); } |
| 174 | + ColumnPtr index(const IColumn &, size_t) const override { throwInapplicable(); } |
| 175 | +#if !defined(DEBUG_OR_SANITIZER_BUILD) |
| 176 | + int compareAt(size_t, size_t, const IColumn &, int) const override { throwInapplicable(); } |
| 177 | +#else |
| 178 | + int doCompareAt(size_t, size_t, const IColumn &, int) const override { throwInapplicable(); } |
| 179 | +#endif |
| 180 | + void compareColumn(const IColumn &, size_t, PaddedPODArray<UInt64> *, PaddedPODArray<Int8> &, int, int) const override |
| 181 | + { |
| 182 | + throwInapplicable(); |
| 183 | + } |
| 184 | + bool hasEqualValues() const override { throwInapplicable(); } |
| 185 | + void getPermutation(IColumn::PermutationSortDirection, IColumn::PermutationSortStability, size_t, int, Permutation &) const override |
| 186 | + { |
| 187 | + throwInapplicable(); |
| 188 | + } |
| 189 | + void updatePermutation( |
| 190 | + IColumn::PermutationSortDirection, IColumn::PermutationSortStability, size_t, int, Permutation &, EqualRanges &) const override |
| 191 | + { |
| 192 | + throwInapplicable(); |
| 193 | + } |
| 194 | + ColumnPtr replicate(const Offsets &) const override { throwInapplicable(); } |
| 195 | + MutableColumns scatter(ColumnIndex, const Selector &) const override { throwInapplicable(); } |
| 196 | + void gather(ColumnGathererStream &) override { throwInapplicable(); } |
| 197 | + void getExtremes(Field &, Field &) const override { throwInapplicable(); } |
| 198 | + size_t byteSizeAt(size_t) const override { throwInapplicable(); } |
| 199 | + double getRatioOfDefaultRows(double) const override { throwInapplicable(); } |
| 200 | + UInt64 getNumberOfDefaultRows() const override { throwInapplicable(); } |
| 201 | + void getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const override { throwInapplicable(); } |
| 202 | + |
| 203 | + bool hasDynamicStructure() const override { throwInapplicable(); } |
| 204 | + void takeDynamicStructureFromSourceColumns(const Columns &) override { throwInapplicable(); } |
| 205 | + |
| 206 | +private: |
| 207 | + /// Compressed and serialized representation of the wrapped column. |
| 208 | + BLOB blob; |
| 209 | + |
| 210 | + /// Always set |
| 211 | + const size_t rows; |
| 212 | + ColumnPtr wrapped_column; |
| 213 | + |
| 214 | + /// Set only in cast of "from" conversion |
| 215 | + FromBLOB from_blob_task; |
| 216 | + |
| 217 | + [[noreturn]] void throwInapplicable() const |
| 218 | + { |
| 219 | + throw Exception(ErrorCodes::LOGICAL_ERROR, "ColumnBLOB should be converted to a regular column before usage"); |
| 220 | + } |
| 221 | +}; |
| 222 | + |
| 223 | +[[nodiscard]] inline Block convertBLOBColumns(const Block & block) |
| 224 | +{ |
| 225 | + Block res; |
| 226 | + res.info = block.info; |
| 227 | + for (const auto & elem : block) |
| 228 | + { |
| 229 | + ColumnWithTypeAndName column = elem; |
| 230 | + if (const auto * col = typeid_cast<const ColumnBLOB *>(column.column.get())) |
| 231 | + column.column = col->convertFrom(); |
| 232 | + res.insert(std::move(column)); |
| 233 | + } |
| 234 | + return res; |
| 235 | +} |
| 236 | +} |
0 commit comments