Skip to content

Commit a61ef30

Browse files
Backport #91711 to 25.12: Fix possible logical error in Log engine during subcolumns reading
1 parent f2f26f4 commit a61ef30

File tree

3 files changed

+58
-22
lines changed

3 files changed

+58
-22
lines changed

src/Storages/StorageLog.cpp

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class LogSource final : public ISource
157157
using DeserializeStates = std::map<String, DeserializeState>;
158158
DeserializeStates deserialize_states;
159159

160+
void readPrefix(const NameAndTypePair & name_and_type, ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsDeserializeStatesCache & deserialize_state_cache);
160161
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache);
161162
bool isFinished();
162163
};
@@ -192,8 +193,17 @@ Chunk LogSource::generate()
192193
/// How many rows to read for the next block.
193194
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
194195
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
196+
std::unordered_map<String, ISerialization::SubstreamsDeserializeStatesCache> deserialize_states_caches;
195197
Block res;
196198

199+
/// First, read prefixes for all columns/subcolumns.
200+
for (const auto & name_and_type : columns)
201+
{
202+
auto name_and_type_on_disk = getColumnOnDisk(name_and_type);
203+
readPrefix(name_and_type_on_disk, caches[name_and_type_on_disk.getNameInStorage()], deserialize_states_caches[name_and_type_on_disk.getNameInStorage()]);
204+
}
205+
206+
/// Second, read the data of all columns/subcolumns.
197207
for (const auto & name_type : columns)
198208
{
199209
ColumnPtr column;
@@ -232,6 +242,35 @@ Chunk LogSource::generate()
232242
return Chunk(res.getColumns(), num_rows);
233243
}
234244

245+
void LogSource::readPrefix(const NameAndTypePair & name_and_type, ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsDeserializeStatesCache & deserialize_state_cache)
246+
{
247+
if (deserialize_states.contains(name_and_type.name))
248+
return;
249+
250+
auto serialization = IDataType::getSerialization(name_and_type);
251+
252+
ISerialization::DeserializeBinaryBulkSettings settings;
253+
settings.getter = [&](const ISerialization::SubstreamPath & path) -> ReadBuffer *
254+
{
255+
if (cache.contains(ISerialization::getSubcolumnNameForStream(path)))
256+
return nullptr;
257+
258+
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path, {});
259+
260+
const auto & data_file_it = storage.data_files_by_names.find(data_file_name);
261+
if (data_file_it == storage.data_files_by_names.end())
262+
throw Exception(ErrorCodes::LOGICAL_ERROR, "No information about file {} in StorageLog", data_file_name);
263+
const auto & data_file = *data_file_it->second;
264+
265+
size_t offset = 0;
266+
size_t file_size = file_sizes[data_file.index];
267+
268+
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings).first;
269+
return &it->second.compressed.value();
270+
};
271+
272+
serialization->deserializeBinaryBulkStatePrefix(settings, deserialize_states[name_and_type.name], &deserialize_state_cache);
273+
}
235274

236275
void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & column,
237276
size_t max_rows_to_read, ISerialization::SubstreamsCache & cache)
@@ -240,35 +279,25 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
240279
const auto & [name, type] = name_and_type;
241280
auto serialization = IDataType::getSerialization(name_and_type);
242281

243-
auto create_stream_getter = [&](bool stream_for_prefix)
282+
settings.getter = [&] (const ISerialization::SubstreamPath & path) -> ReadBuffer *
244283
{
245-
return [&, stream_for_prefix] (const ISerialization::SubstreamPath & path) -> ReadBuffer *
246-
{
247-
if (cache.contains(ISerialization::getSubcolumnNameForStream(path)))
248-
return nullptr;
284+
if (cache.contains(ISerialization::getSubcolumnNameForStream(path)))
285+
return nullptr;
249286

250-
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path, {});
287+
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path, {});
251288

252-
const auto & data_file_it = storage.data_files_by_names.find(data_file_name);
253-
if (data_file_it == storage.data_files_by_names.end())
254-
throw Exception(ErrorCodes::LOGICAL_ERROR, "No information about file {} in StorageLog", data_file_name);
255-
const auto & data_file = *data_file_it->second;
289+
const auto & data_file_it = storage.data_files_by_names.find(data_file_name);
290+
if (data_file_it == storage.data_files_by_names.end())
291+
throw Exception(ErrorCodes::LOGICAL_ERROR, "No information about file {} in StorageLog", data_file_name);
292+
const auto & data_file = *data_file_it->second;
256293

257-
size_t offset = stream_for_prefix ? 0 : offsets[data_file.index];
258-
size_t file_size = file_sizes[data_file.index];
294+
size_t offset = offsets[data_file.index];
295+
size_t file_size = file_sizes[data_file.index];
259296

260-
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings).first;
261-
return &it->second.compressed.value();
262-
};
297+
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings).first;
298+
return &it->second.compressed.value();
263299
};
264300

265-
if (!deserialize_states.contains(name))
266-
{
267-
settings.getter = create_stream_getter(true);
268-
serialization->deserializeBinaryBulkStatePrefix(settings, deserialize_states[name], nullptr);
269-
}
270-
271-
settings.getter = create_stream_getter(false);
272301
serialization->deserializeBinaryBulkWithMultipleStreams(column, 0, max_rows_to_read, settings, deserialize_states[name], &cache);
273302
if (column->getDataType() != name_and_type.type->getColumnType())
274303
throw Exception(

tests/queries/0_stateless/03753_log_engine_shared_prefixes_bug.reference

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
DROP TABLE IF EXISTS test;
2+
CREATE TABLE test (c String) ENGINE = Log;
3+
INSERT INTO TABLE test SELECT randomString(10) from numbers(1000);
4+
INSERT INTO TABLE test SELECT randomString(10) from numbers(1000);
5+
SELECT * FROM test ORDER BY c, c.size FORMAT Null;
6+
DROP TABLE test;
7+

0 commit comments

Comments
 (0)