Skip to content

Commit 17cffca

Browse files
Backport #92076 to 25.12: Add additional checks for missing streams in Wide parts
1 parent e425ed1 commit 17cffca

File tree

5 files changed

+49
-1
lines changed

5 files changed

+49
-1
lines changed

src/DataTypes/Serializations/SerializationNullable.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace ErrorCodes
2222
{
2323
extern const int CANNOT_READ_ALL_DATA;
2424
extern const int INCORRECT_DATA;
25+
extern const int LOGICAL_ERROR;
2526
}
2627

2728
void SerializationNullable::enumerateStreams(
@@ -159,7 +160,7 @@ void SerializationNullable::deserializeBinaryBulkWithMultipleStreams(
159160
auto nested_column = col.getNestedColumnPtr();
160161
if (null_map->size() != nested_column->size())
161162
throw Exception(
162-
ErrorCodes::INCORRECT_DATA,
163+
settings.native_format ? ErrorCodes::INCORRECT_DATA : ErrorCodes::LOGICAL_ERROR,
163164
"Sizes of nested column and null map of Nullable column are not equal after deserialization (null map size = {}, nested "
164165
"column size = {})",
165166
null_map->size(),

src/DataTypes/Serializations/SerializationObject.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,20 @@ void SerializationObject::deserializeBinaryBulkWithMultipleStreams(
10481048
settings.path.pop_back();
10491049
settings.path.pop_back();
10501050

1051+
/// Verify that all typed paths, dynamic paths and shared data has consistent sizes
1052+
size_t expected_size = shared_data->size();
1053+
for (const auto & [path, path_column] : typed_paths)
1054+
{
1055+
if (path_column->size() != expected_size)
1056+
throw Exception(settings.native_format ? ErrorCodes::INCORRECT_DATA : ErrorCodes::LOGICAL_ERROR, "Unexpected size of typed path {}: {}. Expected size {}", path, path_column->size(), expected_size);
1057+
}
1058+
1059+
for (const auto & [path, path_column] : dynamic_paths)
1060+
{
1061+
if (path_column->size() != expected_size)
1062+
throw Exception(settings.native_format ? ErrorCodes::INCORRECT_DATA : ErrorCodes::LOGICAL_ERROR, "Unexpected size of dynamic path {}: {}. Expected size {}", path, path_column->size(), expected_size);
1063+
}
1064+
10511065
column_object.repairDuplicatesInDynamicPathsAndSharedData(shared_data_previous_size);
10521066
}
10531067

src/DataTypes/Serializations/SerializationTuple.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace ErrorCodes
1919
extern const int SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH;
2020
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
2121
extern const int INCORRECT_DATA;
22+
extern const int LOGICAL_ERROR;
2223
}
2324

2425

@@ -820,6 +821,14 @@ void SerializationTuple::deserializeBinaryBulkWithMultipleStreams(
820821
column_tuple.getColumnPtr(i), rows_offset, limit, settings, tuple_state->states[i], cache);
821822
}
822823

824+
/// Verify that all Tuple elements have the same size.
825+
size_t expected_size = column_tuple.getColumn(0).size();
826+
for (size_t i = 1; i < elems.size(); ++i)
827+
{
828+
if (column_tuple.getColumn(i).size() != expected_size)
829+
throw Exception(settings.native_format ? ErrorCodes::INCORRECT_DATA : ErrorCodes::LOGICAL_ERROR, "Unexpected size of tuple element {}: {}. Expected size: {}", i, column_tuple.getColumn(i).size(), expected_size);
830+
}
831+
823832
typeid_cast<ColumnTuple &>(*mutable_column).addSize(column_tuple.getColumn(0).size());
824833
}
825834

src/DataTypes/Serializations/SerializationVariant.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,10 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
607607
col.getVariantPtrByLocalDiscriminator(i), variant_rows_offsets[i], variant_limits[i],
608608
settings, variant_state->variant_states[i], cache);
609609
settings.path.pop_back();
610+
611+
/// Verify that we deserialized data of this variant.
612+
if (variant_limits[i] && col.getVariantPtrByLocalDiscriminator(i)->empty())
613+
throw Exception(settings.native_format ? ErrorCodes::INCORRECT_DATA : ErrorCodes::LOGICAL_ERROR, "Variant {} is empty, but expected to be read {} values", variant_names[i], variant_limits[i]);
610614
}
611615
settings.path.pop_back();
612616

src/Storages/MergeTree/MergeTreeReaderWide.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ namespace
2424
constexpr auto DATA_FILE_EXTENSION = ".bin";
2525
}
2626

27+
namespace ErrorCodes
28+
{
29+
extern const int LOGICAL_ERROR;
30+
}
31+
2732
MergeTreeReaderWide::MergeTreeReaderWide(
2833
MergeTreeDataPartInfoForReaderPtr data_part_info_,
2934
NamesAndTypesList columns_,
@@ -327,7 +332,22 @@ ReadBuffer * MergeTreeReaderWide::getStream(
327332

328333
auto stream_name = IMergeTreeDataPart::getStreamNameForColumn(name_and_type, substream_path, ".bin", checksums, storage_settings);
329334
if (!stream_name)
335+
{
336+
/// We allow missing streams only for columns/subcolumns that are not present in this part.
337+
auto column = data_part_info_for_read->getColumnsDescription().tryGetColumn(GetColumnsOptions::AllPhysical, name_and_type.getNameInStorage());
338+
if (column && (!name_and_type.isSubcolumn() || column->type->hasSubcolumn(name_and_type.getSubcolumnName())))
339+
{
340+
throw Exception(
341+
ErrorCodes::LOGICAL_ERROR,
342+
"Stream {} for column {} with type {} is not found",
343+
ISerialization::getFileNameForStream(
344+
name_and_type.type->getName(), substream_path, ISerialization::StreamFileNameSettings(*storage_settings)),
345+
name_and_type.name,
346+
column->type->getName());
347+
}
348+
330349
return nullptr;
350+
}
331351

332352
auto it = streams.find(*stream_name);
333353
if (it == streams.end())

0 commit comments

Comments
 (0)