Skip to content

Commit aa2afae

Browse files
Backport #78205 to 25.3: Fix prefetch of substreams with prefixes in Wide parts
1 parent 1448018 commit aa2afae

File tree

2 files changed

+23
-11
lines changed

2 files changed

+23
-11
lines changed

src/Storages/MergeTree/MergeTreeReaderWide.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void MergeTreeReaderWide::prefetchForAllColumns(
110110
return;
111111

112112
if (deserialize_prefixes)
113-
deserializePrefixForAllColumnsWithPrefetch(num_columns, current_task_last_mark, priority);
113+
deserializePrefixForAllColumnsWithPrefetch(num_columns, from_mark, current_task_last_mark, priority);
114114

115115
/// Request reading of data in advance,
116116
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
@@ -151,7 +151,7 @@ size_t MergeTreeReaderWide::readRows(
151151
return max_rows_to_read;
152152

153153
prefetchForAllColumns(Priority{}, num_columns, from_mark, current_task_last_mark, continue_reading, /*deserialize_prefixes=*/ true);
154-
deserializePrefixForAllColumns(num_columns, current_task_last_mark);
154+
deserializePrefixForAllColumns(num_columns, from_mark, current_task_last_mark);
155155

156156
for (size_t pos = 0; pos < num_columns; ++pos)
157157
{
@@ -337,6 +337,7 @@ ReadBuffer * MergeTreeReaderWide::getStream(
337337
void MergeTreeReaderWide::deserializePrefix(
338338
const SerializationPtr & serialization,
339339
const NameAndTypePair & name_and_type,
340+
size_t from_mark,
340341
size_t current_task_last_mark,
341342
DeserializeBinaryBulkStateMap & deserialize_state_map,
342343
ISerialization::SubstreamsCache & cache,
@@ -352,6 +353,15 @@ void MergeTreeReaderWide::deserializePrefix(
352353
deserialize_settings.prefixes_deserialization_thread_pool = settings.use_prefixes_deserialization_thread_pool ? &getMergeTreePrefixesDeserializationThreadPool().get() : nullptr;
353354
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
354355
{
356+
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(name_and_type, substream_path, data_part_info_for_read->getChecksums());
357+
/// This stream could be prefetched in prefetchBeginOfRange, but here we
358+
/// have to seek the stream to the start of file to deserialize the prefix.
359+
/// If we do not read from the first mark, we should remove this stream from
360+
/// prefetched_streams to prefetch it again starting from the current mark
361+
/// after prefix is deserialized.
362+
if (stream_name && from_mark != 0)
363+
prefetched_streams.erase(*stream_name);
364+
355365
return getStream(/* seek_to_start = */true, substream_path, data_part_info_for_read->getChecksums(), name_and_type, 0, /* seek_to_mark = */false, current_task_last_mark, cache);
356366
};
357367
/// Add streams for newly discovered dynamic subcolumns to start async marks loading beforehand if needed.
@@ -369,7 +379,7 @@ void MergeTreeReaderWide::deserializePrefix(
369379
}
370380
}
371381

372-
void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter)
382+
void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns, size_t from_mark, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter)
373383
{
374384
/// Check if we already deserialized prefixes.
375385
if (!deserialize_binary_bulk_state_map.empty())
@@ -387,6 +397,7 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns,
387397
deserializePrefix(
388398
serializations[pos],
389399
columns_to_read[pos],
400+
from_mark,
390401
current_task_last_mark,
391402
deserialize_state_map,
392403
cache,
@@ -411,12 +422,12 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns,
411422
deserialize_binary_bulk_state_map = deserialize();
412423
}
413424

414-
void MergeTreeReaderWide::deserializePrefixForAllColumns(size_t num_columns, size_t current_task_last_mark)
425+
void MergeTreeReaderWide::deserializePrefixForAllColumns(size_t num_columns, size_t from_mark, size_t current_task_last_mark)
415426
{
416-
deserializePrefixForAllColumnsImpl(num_columns, current_task_last_mark, {});
427+
deserializePrefixForAllColumnsImpl(num_columns, from_mark, current_task_last_mark, {});
417428
}
418429

419-
void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch(size_t num_columns, size_t current_task_last_mark, Priority priority)
430+
void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch(size_t num_columns, size_t from_mark, size_t current_task_last_mark, Priority priority)
420431
{
421432
auto prefixes_prefetch_callback_getter = [&](const NameAndTypePair & name_and_type)
422433
{
@@ -434,7 +445,7 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch(size_t num_
434445
};
435446
};
436447

437-
deserializePrefixForAllColumnsImpl(num_columns, current_task_last_mark, prefixes_prefetch_callback_getter);
448+
deserializePrefixForAllColumnsImpl(num_columns, from_mark, current_task_last_mark, prefixes_prefetch_callback_getter);
438449
}
439450

440451
void MergeTreeReaderWide::prefetchForColumn(
@@ -494,7 +505,7 @@ void MergeTreeReaderWide::readData(
494505
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
495506
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
496507

497-
deserializePrefix(serialization, name_and_type, current_task_last_mark, deserialize_binary_bulk_state_map, cache, deserialize_states_cache, {});
508+
deserializePrefix(serialization, name_and_type, from_mark, current_task_last_mark, deserialize_binary_bulk_state_map, cache, deserialize_states_cache, {});
498509

499510
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
500511
{

src/Storages/MergeTree/MergeTreeReaderWide.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,18 @@ class MergeTreeReaderWide : public IMergeTreeReader
9090
void deserializePrefix(
9191
const SerializationPtr & serialization,
9292
const NameAndTypePair & name_and_type,
93+
size_t from_mark,
9394
size_t current_task_last_mark,
9495
DeserializeBinaryBulkStateMap & deserialize_state_map,
9596
ISerialization::SubstreamsCache & cache,
9697
ISerialization::SubstreamsDeserializeStatesCache & deserialize_states_cache,
9798
ISerialization::StreamCallback prefixes_prefetch_callback);
9899

99-
void deserializePrefixForAllColumns(size_t num_columns, size_t current_task_last_mark);
100-
void deserializePrefixForAllColumnsWithPrefetch(size_t num_columns, size_t current_task_last_mark, Priority priority);
100+
void deserializePrefixForAllColumns(size_t num_columns, size_t from_mark, size_t current_task_last_mark);
101+
void deserializePrefixForAllColumnsWithPrefetch(size_t num_columns, size_t from_mark, size_t current_task_last_mark, Priority priority);
101102

102103
using StreamCallbackGetter = std::function<ISerialization::StreamCallback(const NameAndTypePair &)>;
103-
void deserializePrefixForAllColumnsImpl(size_t num_columns, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter);
104+
void deserializePrefixForAllColumnsImpl(size_t num_columns, size_t from_mark, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter);
104105

105106
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
106107
std::unordered_map<String, ISerialization::SubstreamsDeserializeStatesCache> deserialize_states_caches;

0 commit comments

Comments
 (0)