Skip to content

Commit 63ae661

Browse files
committed
Resolve issues
1 parent 8e8c9b1 commit 63ae661

File tree

6 files changed

+65
-38
lines changed

6 files changed

+65
-38
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,24 @@ IcebergSnapshot IcebergMetadata::getSnapshot(const String & manifest_list_file)
399399
return IcebergSnapshot{manifest_lists_by_name.emplace(manifest_list_file, initializeManifestList(manifest_list_file)).first};
400400
}
401401

402+
std::vector<Int32>
403+
getRelevantPartitionColumnIds(const ManifestFileEntry & entry, const IcebergSchemaProcessor & schema_processor, Int32 current_schema_id)
404+
{
405+
std::vector<Int32> partition_column_ids;
406+
partition_column_ids.reserve(entry.getContent().getPartitionColumnInfos().size());
407+
for (const auto & partition_column_info : entry.getContent().getPartitionColumnInfos())
408+
{
409+
std::optional<NameAndTypePair> name_and_type
410+
= schema_processor.tryGetFieldCharacteristics(current_schema_id, partition_column_info.source_id);
411+
if (name_and_type)
412+
{
413+
partition_column_ids.push_back(partition_column_info.source_id);
414+
}
415+
}
416+
return partition_column_ids;
417+
}
418+
419+
402420
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
403421
{
404422
if (!current_snapshot)
@@ -410,43 +428,27 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
410428
Strings data_files;
411429
for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
412430
{
413-
NamesAndTypesList partition_pruning_names_and_types;
414-
std::vector<size_t> partition_pruning_indices;
415-
for (size_t i = 0; i < manifest_entry.getContent().getPartitionColumnInfos().size(); ++i)
416-
{
417-
// Since some columns may be renamed or deleted, we need to determine the correct column names and types for partition pruning based on the current schema.
418-
std::optional<NameAndTypePair> name_and_type = schema_processor.tryGetFieldCharacteristics(
419-
current_schema_id, manifest_entry.getContent().getPartitionColumnInfos()[i].source_id);
420-
if (name_and_type)
421-
{
422-
partition_pruning_names_and_types.push_back(name_and_type.value());
423-
partition_pruning_indices.push_back(i);
424-
}
425-
}
426-
ExpressionActionsPtr partition_minmax_idx_expr
427-
= std::make_shared<ExpressionActions>(ActionsDAG(partition_pruning_names_and_types), ExpressionActionsSettings(getContext()));
431+
const auto & partition_columns_ids = getRelevantPartitionColumnIds(manifest_entry, schema_processor, current_schema_id);
432+
const auto & partition_pruning_columns_names_and_types
433+
= schema_processor.tryGetFieldsCharacteristics(current_schema_id, partition_columns_ids);
434+
435+
ExpressionActionsPtr partition_minmax_idx_expr = std::make_shared<ExpressionActions>(
436+
ActionsDAG(partition_pruning_columns_names_and_types), ExpressionActionsSettings(getContext()));
428437
const KeyCondition partition_key_condition(
429-
filter_dag, getContext(), partition_pruning_names_and_types.getNames(), partition_minmax_idx_expr);
438+
filter_dag, getContext(), partition_pruning_columns_names_and_types.getNames(), partition_minmax_idx_expr);
430439

431440
const auto & data_files_in_manifest = manifest_entry.getContent().getDataFiles();
432441
for (const auto & data_file : data_files_in_manifest)
433442
{
434443
if (data_file.status != ManifestEntryStatus::DELETED)
435444
{
436-
std::vector<Range> ranges;
437-
ranges.reserve(partition_pruning_indices.size());
438-
for (const auto j : partition_pruning_indices)
439-
{
440-
ranges.push_back(data_file.partition_ranges[j]);
441-
}
442-
if (partition_key_condition.checkInHyperrectangle(ranges, partition_pruning_names_and_types.getTypes()).can_be_true)
443-
{
445+
if (partition_key_condition
446+
.checkInHyperrectangle(
447+
data_file.getPartitionRanges(partition_columns_ids), partition_pruning_columns_names_and_types.getTypes())
448+
.can_be_true)
444449
data_files.push_back(data_file.data_file_name);
445-
}
446450
else
447-
{
448451
ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunnedFiles);
449-
}
450452
}
451453
}
452454
}

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ Int32 ManifestFileContent::getSchemaId() const
3333
return impl->schema_id;
3434
}
3535

36+
std::vector<DB::Range> DataFileEntry::getPartitionRanges(const std::vector<Int32> & partition_columns_ids) const
37+
{
38+
std::vector<DB::Range> filtered_partition_ranges;
39+
filtered_partition_ranges.reserve(partition_columns_ids.size());
40+
for (const auto & partition_column_id : partition_columns_ids)
41+
{
42+
filtered_partition_ranges.push_back(partition_ranges.at(partition_column_id));
43+
}
44+
return filtered_partition_ranges;
45+
}
46+
3647

3748
const std::vector<PartitionColumnInfo> & ManifestFileContent::getPartitionColumnInfos() const
3849
{
@@ -206,15 +217,17 @@ ManifestFileContentImpl::ManifestFileContentImpl(
206217
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", common_path, data_path);
207218

208219
const auto file_path = data_path.substr(pos);
209-
std::vector<DB::Range> partition_ranges;
210-
partition_ranges.reserve(partition_columns.size());
220+
std::unordered_map<Int32, Range> partition_ranges;
211221
for (size_t j = 0; j < partition_columns.size(); ++j)
212222
{
213-
partition_ranges.push_back(getPartitionRange(
214-
partition_column_infos[j].transform,
215-
i,
216-
partition_columns[j],
217-
schema_processor.getFieldCharacteristics(schema_id, partition_column_infos[j].source_id).type));
223+
const Int32 source_id = partition_column_infos[j].source_id;
224+
partition_ranges.emplace(
225+
source_id,
226+
getPartitionRange(
227+
partition_column_infos[j].transform,
228+
i,
229+
partition_columns[j],
230+
schema_processor.getFieldCharacteristics(schema_id, source_id).type));
218231
}
219232
this->data_files.push_back({file_path, status, content_type, partition_ranges});
220233
}

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ struct DataFileEntry
3333
String data_file_name;
3434
ManifestEntryStatus status;
3535
DataFileContent content;
36-
std::vector<DB::Range> partition_ranges;
36+
std::unordered_map<Int32, DB::Range> partition_ranges;
37+
38+
std::vector<DB::Range> getPartitionRanges(const std::vector<Int32> & partition_columns_ids) const;
3739
};
3840

3941
struct PartitionColumnInfo

src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ Range getPartitionRange(
104104
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition column type: {}", partition_column->getFamilyName());
105105
}
106106

107-
auto nested_data_type = column_data_type;
108-
if (nested_data_type->isNullable())
109-
nested_data_type = removeNullable(nested_data_type);
107+
auto nested_data_type = removeNullable(column_data_type);
110108

111109
const auto * casted_innner_column = assert_cast<const ColumnInt32 *>(partition_column.get());
112110
Int32 value = casted_innner_column->getElement(index);

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ std::optional<NameAndTypePair> IcebergSchemaProcessor::tryGetFieldCharacteristic
139139
return it->second;
140140
}
141141

142+
NamesAndTypesList IcebergSchemaProcessor::tryGetFieldsCharacteristics(Int32 schema_id, const std::vector<Int32> & source_ids) const
143+
{
144+
NamesAndTypesList fields;
145+
for (const auto & source_id : source_ids)
146+
{
147+
auto it = clickhouse_types_by_source_ids.find({schema_id, source_id});
148+
if (it != clickhouse_types_by_source_ids.end())
149+
fields.push_back(it->second);
150+
}
151+
return fields;
152+
}
142153

143154
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
144155
{

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class IcebergSchemaProcessor
8181
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
8282
NameAndTypePair getFieldCharacteristics(Int32 schema_version, Int32 source_id) const;
8383
std::optional<NameAndTypePair> tryGetFieldCharacteristics(Int32 schema_version, Int32 source_id) const;
84+
NamesAndTypesList tryGetFieldsCharacteristics(Int32 schema_id, const std::vector<Int32> & source_ids) const;
8485

8586
private:
8687
std::unordered_map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids;

0 commit comments

Comments
 (0)