@@ -110,7 +110,7 @@ void MergeTreeReaderWide::prefetchForAllColumns(
110110 return ;
111111
112112 if (deserialize_prefixes)
113- deserializePrefixForAllColumnsWithPrefetch (num_columns, current_task_last_mark, priority);
113+ deserializePrefixForAllColumnsWithPrefetch (num_columns, from_mark, current_task_last_mark, priority);
114114
115115 // / Request reading of data in advance,
116116 // / so if reading can be asynchronous, it will also be performed in parallel for all columns.
@@ -151,7 +151,7 @@ size_t MergeTreeReaderWide::readRows(
151151 return max_rows_to_read;
152152
153153 prefetchForAllColumns (Priority{}, num_columns, from_mark, current_task_last_mark, continue_reading, /* deserialize_prefixes=*/ true );
154- deserializePrefixForAllColumns (num_columns, current_task_last_mark);
154+ deserializePrefixForAllColumns (num_columns, from_mark, current_task_last_mark);
155155
156156 for (size_t pos = 0 ; pos < num_columns; ++pos)
157157 {
@@ -337,6 +337,7 @@ ReadBuffer * MergeTreeReaderWide::getStream(
337337void MergeTreeReaderWide::deserializePrefix (
338338 const SerializationPtr & serialization,
339339 const NameAndTypePair & name_and_type,
340+ size_t from_mark,
340341 size_t current_task_last_mark,
341342 DeserializeBinaryBulkStateMap & deserialize_state_map,
342343 ISerialization::SubstreamsCache & cache,
@@ -352,6 +353,15 @@ void MergeTreeReaderWide::deserializePrefix(
352353 deserialize_settings.prefixes_deserialization_thread_pool = settings.use_prefixes_deserialization_thread_pool ? &getMergeTreePrefixesDeserializationThreadPool ().get () : nullptr ;
353354 deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
354355 {
356+ auto stream_name = IMergeTreeDataPart::getStreamNameForColumn (name_and_type, substream_path, data_part_info_for_read->getChecksums ());
357+ // / This stream could be prefetched in prefetchBeginOfRange, but here we
358+ // / have to seek the stream to the start of file to deserialize the prefix.
359+ // / If we do not read from the first mark, we should remove this stream from
360+ // / prefetched_streams to prefetch it again starting from the current mark
361+ // / after prefix is deserialized.
362+ if (stream_name && from_mark != 0 )
363+ prefetched_streams.erase (*stream_name);
364+
355365 return getStream (/* seek_to_start = */ true , substream_path, data_part_info_for_read->getChecksums (), name_and_type, 0 , /* seek_to_mark = */ false , current_task_last_mark, cache);
356366 };
357367 // / Add streams for newly discovered dynamic subcolumns to start async marks loading beforehand if needed.
@@ -369,7 +379,7 @@ void MergeTreeReaderWide::deserializePrefix(
369379 }
370380}
371381
372- void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl (size_t num_columns, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter)
382+ void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl (size_t num_columns, size_t from_mark, size_t current_task_last_mark, StreamCallbackGetter prefixes_prefetch_callback_getter)
373383{
374384 // / Check if we already deserialized prefixes.
375385 if (!deserialize_binary_bulk_state_map.empty ())
@@ -387,6 +397,7 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns,
387397 deserializePrefix (
388398 serializations[pos],
389399 columns_to_read[pos],
400+ from_mark,
390401 current_task_last_mark,
391402 deserialize_state_map,
392403 cache,
@@ -411,12 +422,12 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsImpl(size_t num_columns,
411422 deserialize_binary_bulk_state_map = deserialize ();
412423}
413424
414- void MergeTreeReaderWide::deserializePrefixForAllColumns (size_t num_columns, size_t current_task_last_mark)
425+ void MergeTreeReaderWide::deserializePrefixForAllColumns (size_t num_columns, size_t from_mark, size_t current_task_last_mark)
415426{
416- deserializePrefixForAllColumnsImpl (num_columns, current_task_last_mark, {});
427+ deserializePrefixForAllColumnsImpl (num_columns, from_mark, current_task_last_mark, {});
417428}
418429
419- void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch (size_t num_columns, size_t current_task_last_mark, Priority priority)
430+ void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch (size_t num_columns, size_t from_mark, size_t current_task_last_mark, Priority priority)
420431{
421432 auto prefixes_prefetch_callback_getter = [&](const NameAndTypePair & name_and_type)
422433 {
@@ -434,7 +445,7 @@ void MergeTreeReaderWide::deserializePrefixForAllColumnsWithPrefetch(size_t num_
434445 };
435446 };
436447
437- deserializePrefixForAllColumnsImpl (num_columns, current_task_last_mark, prefixes_prefetch_callback_getter);
448+ deserializePrefixForAllColumnsImpl (num_columns, from_mark, current_task_last_mark, prefixes_prefetch_callback_getter);
438449}
439450
440451void MergeTreeReaderWide::prefetchForColumn (
@@ -494,7 +505,7 @@ void MergeTreeReaderWide::readData(
494505 ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
495506 deserialize_settings.avg_value_size_hint = avg_value_size_hint;
496507
497- deserializePrefix (serialization, name_and_type, current_task_last_mark, deserialize_binary_bulk_state_map, cache, deserialize_states_cache, {});
508+ deserializePrefix (serialization, name_and_type, from_mark, current_task_last_mark, deserialize_binary_bulk_state_map, cache, deserialize_states_cache, {});
498509
499510 deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
500511 {
0 commit comments