@@ -157,6 +157,7 @@ class LogSource final : public ISource
157157 using DeserializeStates = std::map<String, DeserializeState>;
158158 DeserializeStates deserialize_states;
159159
160+ void readPrefix (const NameAndTypePair & name_and_type, ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsDeserializeStatesCache & deserialize_state_cache);
160161 void readData (const NameAndTypePair & name_and_type, ColumnPtr & column, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache);
161162 bool isFinished ();
162163};
@@ -192,8 +193,17 @@ Chunk LogSource::generate()
192193 // / How many rows to read for the next block.
193194 size_t max_rows_to_read = std::min (block_size, rows_limit - rows_read);
194195 std::unordered_map<String, ISerialization::SubstreamsCache> caches;
196+ std::unordered_map<String, ISerialization::SubstreamsDeserializeStatesCache> deserialize_states_caches;
195197 Block res;
196198
199+ // / First, read prefixes for all columns/subcolumns.
200+ for (const auto & name_and_type : columns)
201+ {
202+ auto name_and_type_on_disk = getColumnOnDisk (name_and_type);
203+ readPrefix (name_and_type_on_disk, caches[name_and_type_on_disk.getNameInStorage ()], deserialize_states_caches[name_and_type_on_disk.getNameInStorage ()]);
204+ }
205+
206+ // / Second, read the data of all columns/subcolumns.
197207 for (const auto & name_type : columns)
198208 {
199209 ColumnPtr column;
@@ -232,6 +242,35 @@ Chunk LogSource::generate()
232242 return Chunk (res.getColumns (), num_rows);
233243}
234244
245+ void LogSource::readPrefix (const NameAndTypePair & name_and_type, ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsDeserializeStatesCache & deserialize_state_cache)
246+ {
247+ if (deserialize_states.contains (name_and_type.name ))
248+ return ;
249+
250+ auto serialization = IDataType::getSerialization (name_and_type);
251+
252+ ISerialization::DeserializeBinaryBulkSettings settings;
253+ settings.getter = [&](const ISerialization::SubstreamPath & path) -> ReadBuffer *
254+ {
255+ if (cache.contains (ISerialization::getSubcolumnNameForStream (path)))
256+ return nullptr ;
257+
258+ String data_file_name = ISerialization::getFileNameForStream (name_and_type, path, {});
259+
260+ const auto & data_file_it = storage.data_files_by_names .find (data_file_name);
261+ if (data_file_it == storage.data_files_by_names .end ())
262+ throw Exception (ErrorCodes::LOGICAL_ERROR, " No information about file {} in StorageLog" , data_file_name);
263+ const auto & data_file = *data_file_it->second ;
264+
265+ size_t offset = 0 ;
266+ size_t file_size = file_sizes[data_file.index ];
267+
268+ auto it = streams.try_emplace (data_file_name, storage.disk , data_file.path , offset, file_size, limited_by_file_sizes, read_settings).first ;
269+ return &it->second .compressed .value ();
270+ };
271+
272+ serialization->deserializeBinaryBulkStatePrefix (settings, deserialize_states[name_and_type.name ], &deserialize_state_cache);
273+ }
235274
236275void LogSource::readData (const NameAndTypePair & name_and_type, ColumnPtr & column,
237276 size_t max_rows_to_read, ISerialization::SubstreamsCache & cache)
@@ -240,35 +279,25 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
240279 const auto & [name, type] = name_and_type;
241280 auto serialization = IDataType::getSerialization (name_and_type);
242281
243- auto create_stream_getter = [&]( bool stream_for_prefix)
282+ settings. getter = [&] ( const ISerialization::SubstreamPath & path) -> ReadBuffer *
244283 {
245- return [&, stream_for_prefix] (const ISerialization::SubstreamPath & path) -> ReadBuffer *
246- {
247- if (cache.contains (ISerialization::getSubcolumnNameForStream (path)))
248- return nullptr ;
284+ if (cache.contains (ISerialization::getSubcolumnNameForStream (path)))
285+ return nullptr ;
249286
250- String data_file_name = ISerialization::getFileNameForStream (name_and_type, path, {});
287+ String data_file_name = ISerialization::getFileNameForStream (name_and_type, path, {});
251288
252- const auto & data_file_it = storage.data_files_by_names .find (data_file_name);
253- if (data_file_it == storage.data_files_by_names .end ())
254- throw Exception (ErrorCodes::LOGICAL_ERROR, " No information about file {} in StorageLog" , data_file_name);
255- const auto & data_file = *data_file_it->second ;
289+ const auto & data_file_it = storage.data_files_by_names .find (data_file_name);
290+ if (data_file_it == storage.data_files_by_names .end ())
291+ throw Exception (ErrorCodes::LOGICAL_ERROR, " No information about file {} in StorageLog" , data_file_name);
292+ const auto & data_file = *data_file_it->second ;
256293
257- size_t offset = stream_for_prefix ? 0 : offsets[data_file.index ];
258- size_t file_size = file_sizes[data_file.index ];
294+ size_t offset = offsets[data_file.index ];
295+ size_t file_size = file_sizes[data_file.index ];
259296
260- auto it = streams.try_emplace (data_file_name, storage.disk , data_file.path , offset, file_size, limited_by_file_sizes, read_settings).first ;
261- return &it->second .compressed .value ();
262- };
297+ auto it = streams.try_emplace (data_file_name, storage.disk , data_file.path , offset, file_size, limited_by_file_sizes, read_settings).first ;
298+ return &it->second .compressed .value ();
263299 };
264300
265- if (!deserialize_states.contains (name))
266- {
267- settings.getter = create_stream_getter (true );
268- serialization->deserializeBinaryBulkStatePrefix (settings, deserialize_states[name], nullptr );
269- }
270-
271- settings.getter = create_stream_getter (false );
272301 serialization->deserializeBinaryBulkWithMultipleStreams (column, 0 , max_rows_to_read, settings, deserialize_states[name], &cache);
273302 if (column->getDataType () != name_and_type.type ->getColumnType ())
274303 throw Exception (
0 commit comments