Skip to content

Commit 1ef1c16

Browse files
Backport #85829 to 25.8: Use relevant column mapper when reading an iceberg table
1 parent 7528177 commit 1ef1c16

File tree

17 files changed

+288
-83
lines changed

17 files changed

+288
-83
lines changed

src/Databases/DataLake/Common.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
106106
return std::make_shared<DB::DataTypeTuple>(field_types, field_names);
107107
}
108108

109-
return nullable ? DB::makeNullable(DB::IcebergSchemaProcessor::getSimpleType(name)) : DB::IcebergSchemaProcessor::getSimpleType(name);
109+
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
110+
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
110111
}
111112

112113
std::pair<std::string, std::string> parseTableName(const std::string & name)

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ bool RestCatalog::getTableMetadataImpl(
621621
if (result.requiresSchema())
622622
{
623623
// int format_version = metadata_object->getValue<int>("format-version");
624-
auto schema_processor = DB::IcebergSchemaProcessor();
624+
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor();
625625
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
626626
auto schema = schema_processor.getClickhouseTableSchemaById(id);
627627
result.setSchema(*schema);

src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli
8585

8686
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), num_streams);
8787

88-
auto format_filter_info = std::make_shared<FormatFilterInfo>(filter_actions_dag, context, configuration->getColumnMapper());
88+
auto format_filter_info
89+
= std::make_shared<FormatFilterInfo>(filter_actions_dag, context, configuration->getColumnMapperForCurrentSchema());
8990
format_filter_info->prewhere_info = prewhere_info;
9091

9192
for (size_t i = 0; i < num_streams; ++i)

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
240240
current_metadata->modifyFormatSettings(settings_);
241241
}
242242

243-
ColumnMapperPtr getColumnMapper() const override
243+
ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override
244244
{
245-
return current_metadata->getColumnMapper();
245+
assertInitialized();
246+
return current_metadata->getColumnMapperForObject(object_info);
247+
}
248+
ColumnMapperPtr getColumnMapperForCurrentSchema() const override
249+
{
250+
assertInitialized();
251+
return current_metadata->getColumnMapperForCurrentSchema();
246252
}
247253

248254
SinkToStoragePtr write(

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ class IDataLakeMetadata : boost::noncopyable
8181

8282
/// Some data lakes specify information for reading files from disks.
8383
/// For example, Iceberg has Parquet schema field ids in its metadata for reading files.
84-
virtual ColumnMapperPtr getColumnMapper() const { return nullptr; }
84+
virtual ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr /**/) const { return nullptr; }
85+
virtual ColumnMapperPtr getColumnMapperForCurrentSchema() const { return nullptr; }
8586

8687
virtual SinkToStoragePtr write(
8788
SharedHeader /*sample_block*/,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 71 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@
66
#include <cstddef>
77
#include <memory>
88
#include <optional>
9-
#include <Formats/FormatFilterInfo.h>
10-
#include <Formats/FormatParserSharedResources.h>
11-
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
12-
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
13-
#include <Poco/JSON/Array.h>
14-
#include <Poco/JSON/Object.h>
15-
#include <Poco/JSON/Stringifier.h>
16-
#include <Common/Exception.h>
179
#include <Columns/ColumnSet.h>
1810
#include <DataTypes/DataTypeSet.h>
11+
#include <Formats/FormatFilterInfo.h>
12+
#include <Formats/FormatParserSharedResources.h>
1913
#include <Formats/ReadSchemaUtils.h>
2014
#include <Functions/FunctionFactory.h>
2115
#include <Functions/IFunctionAdaptors.h>
2216
#include <Functions/tuple.h>
2317
#include <Processors/Formats/ISchemaReader.h>
18+
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
2419
#include <Processors/Transforms/FilterTransform.h>
2520
#include <QueryPipeline/QueryPipelineBuilder.h>
21+
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
22+
#include <Poco/JSON/Array.h>
23+
#include <Poco/JSON/Object.h>
24+
#include <Poco/JSON/Stringifier.h>
25+
#include <Common/Exception.h>
2626

2727
#include <Databases/DataLake/Common.h>
2828
#include <Core/Settings.h>
@@ -43,24 +43,24 @@
4343
#include <Interpreters/ExpressionActions.h>
4444
#include <IO/CompressedReadBufferWrapper.h>
4545

46+
#include <Disks/ObjectStorages/IObjectStorage.h>
47+
#include <Interpreters/StorageID.h>
4648
#include <Storages/ColumnsDescription.h>
4749
#include <Storages/ObjectStorage/DataLakes/Iceberg/AvroForIcebergDeserializer.h>
50+
#include <Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h>
51+
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
4852
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
4953
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h>
5054
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
55+
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
5156
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
5257
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
58+
#include <Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
5359
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
5460
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
5561
#include <Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h>
5662
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
5763
#include <Storages/ObjectStorage/Utils.h>
58-
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
59-
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
60-
#include <Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h>
61-
#include <Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
62-
#include <Disks/ObjectStorages/IObjectStorage.h>
63-
#include <Interpreters/StorageID.h>
6464

6565
#include <Common/ProfileEvents.h>
6666
#include <Common/SharedLockGuard.h>
@@ -309,26 +309,6 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
309309
}
310310
}
311311

312-
#if USE_PARQUET
313-
if (configuration_ptr->format == "Parquet")
314-
column_mapper = std::make_shared<ColumnMapper>();
315-
316-
if (column_mapper)
317-
{
318-
Int32 schema_id = snapshot->getValue<Int32>(f_schema_id);
319-
std::unordered_map<String, Int64> column_name_to_parquet_field_id;
320-
for (UInt32 j = 0; j < schemas->size(); ++j)
321-
{
322-
auto schema = schemas->getObject(j);
323-
if (schema->getValue<Int32>(f_schema_id) != schema_id)
324-
continue;
325-
326-
column_name_to_parquet_field_id = IcebergSchemaProcessor::traverseSchema(schema->getArray(Iceberg::f_fields));
327-
}
328-
column_mapper->setStorageColumnEncoding(std::move(column_name_to_parquet_field_id));
329-
}
330-
#endif
331-
332312
relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
333313
getManifestList(
334314
object_storage,
@@ -448,7 +428,8 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
448428
if (!iceberg_object_info)
449429
return nullptr;
450430
/// if we need schema evolution or have equality deletes files, we need to read all the columns.
451-
return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id) || (!iceberg_object_info->equality_deletes_objects.empty())
431+
return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id)
432+
|| (!iceberg_object_info->equality_deletes_objects.empty())
452433
? persistent_components.schema_processor->getClickhouseTableSchemaById(iceberg_object_info->underlying_format_read_schema_id)
453434
: nullptr;
454435
}
@@ -497,9 +478,9 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands)
497478
{
498479
for (const auto & command : commands)
499480
{
500-
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN)
501-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by Iceberg storage",
502-
command.type);
481+
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN
482+
&& command.type != AlterCommand::Type::MODIFY_COLUMN)
483+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by Iceberg storage", command.type);
503484
}
504485
}
505486

@@ -826,18 +807,21 @@ std::tuple<Int64, Int32> IcebergMetadata::getVersion() const
826807
return std::make_tuple(relevant_snapshot_id, relevant_snapshot_schema_id);
827808
}
828809

829-
void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional<FormatSettings> & format_settings, ContextPtr local_context) const
810+
void IcebergMetadata::addDeleteTransformers(
811+
ObjectInfoPtr object_info,
812+
QueryPipelineBuilder & builder,
813+
const std::optional<FormatSettings> & format_settings,
814+
ContextPtr local_context) const
830815
{
831816
auto iceberg_object_info = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object_info);
832817
if (!iceberg_object_info)
833818
return;
834819

835820
if (!iceberg_object_info->position_deletes_objects.empty())
836821
{
837-
builder.addSimpleTransform([&](const SharedHeader & header)
838-
{
839-
return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context);
840-
});
822+
builder.addSimpleTransform(
823+
[&](const SharedHeader & header)
824+
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context); });
841825
}
842826
const auto & delete_files = iceberg_object_info->equality_deletes_objects;
843827
LOG_DEBUG(log, "Constructing filter transform for equality delete, there are {} delete files", delete_files.size());
@@ -865,15 +849,26 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
865849
std::vector<size_t> equality_indexes_delete_file;
866850
for (Int32 col_id : equality_ids)
867851
{
868-
NameAndTypePair name_and_type = persistent_components.schema_processor->getFieldCharacteristics(delete_file.schema_id, col_id);
852+
NameAndTypePair name_and_type
853+
= persistent_components.schema_processor->getFieldCharacteristics(delete_file.schema_id, col_id);
869854
block_for_set.insert(ColumnWithTypeAndName(name_and_type.type, name_and_type.name));
870855
equality_indexes_delete_file.push_back(delete_file_header.getPositionByName(name_and_type.name));
871856
}
872857
/// Then we read the content of the delete file.
873858
auto mutable_columns_for_set = block_for_set.cloneEmptyColumns();
874859
std::unique_ptr<ReadBuffer> data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log);
875860
CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto");
876-
auto delete_format = FormatFactory::instance().getInput(delete_file.file_format, *data_read_buffer, delete_file_header, local_context, local_context->getSettingsRef()[DB::Setting::max_block_size], format_settings, nullptr, nullptr, true, compression_method);
861+
auto delete_format = FormatFactory::instance().getInput(
862+
delete_file.file_format,
863+
*data_read_buffer,
864+
delete_file_header,
865+
local_context,
866+
local_context->getSettingsRef()[DB::Setting::max_block_size],
867+
format_settings,
868+
nullptr,
869+
nullptr,
870+
true,
871+
compression_method);
877872
/// only get the delete columns and construct a set by 'block_for_set'
878873
while (true)
879874
{
@@ -890,8 +885,10 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
890885
block_for_set.setColumns(std::move(mutable_columns_for_set));
891886
/// we are constructing a 'not in' expression
892887
const auto & settings = local_context->getSettingsRef();
893-
SizeLimits size_limits_for_set = {settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]};
894-
FutureSetPtr future_set = std::make_shared<FutureSetFromTuple>(CityHash_v1_0_2::uint128(), nullptr, block_for_set.getColumnsWithTypeAndName(), true, size_limits_for_set);
888+
SizeLimits size_limits_for_set
889+
= {settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]};
890+
FutureSetPtr future_set = std::make_shared<FutureSetFromTuple>(
891+
CityHash_v1_0_2::uint128(), nullptr, block_for_set.getColumnsWithTypeAndName(), true, size_limits_for_set);
895892
ColumnPtr set_col = ColumnSet::create(1, future_set);
896893
ActionsDAG dag(header->getColumnsWithTypeAndName());
897894
/// Construct right argument of 'not in' expression, it is the column set.
@@ -904,27 +901,23 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
904901
/// select columns to use in 'notIn' function
905902
for (Int32 col_id : equality_ids)
906903
{
907-
NameAndTypePair name_and_type = persistent_components.schema_processor->getFieldCharacteristics(iceberg_object_info->underlying_format_read_schema_id, col_id);
904+
NameAndTypePair name_and_type = persistent_components.schema_processor->getFieldCharacteristics(
905+
iceberg_object_info->underlying_format_read_schema_id, col_id);
908906
auto it = outputs.find(name_and_type.name);
909907
if (it == outputs.end())
910908
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find column {} in dag outputs", name_and_type.name);
911909
left_columns.push_back(it->second);
912910
}
913-
FunctionOverloadResolverPtr func_tuple_builder =
914-
std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionTuple>());
915-
const ActionsDAG::Node * in_lhs_arg = left_columns.size() == 1 ?
916-
left_columns.front() :
917-
&dag.addFunction(func_tuple_builder, std::move(left_columns), {});
911+
FunctionOverloadResolverPtr func_tuple_builder
912+
= std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionTuple>());
913+
const ActionsDAG::Node * in_lhs_arg
914+
= left_columns.size() == 1 ? left_columns.front() : &dag.addFunction(func_tuple_builder, std::move(left_columns), {});
918915
/// we got the NOT IN function
919916
auto func_not_in = FunctionFactory::instance().get("notIn", nullptr);
920917
const auto & not_in_node = dag.addFunction(func_not_in, {in_lhs_arg, in_rhs_arg}, "notInResult");
921918
dag.getOutputs().push_back(&not_in_node);
922919
LOG_DEBUG(log, "Use expression {} in equality deletes", dag.dumpDAG());
923-
return std::make_shared<FilterTransform>(
924-
header,
925-
std::make_shared<ExpressionActions>(std::move(dag)),
926-
"notInResult",
927-
true);
920+
return std::make_shared<FilterTransform>(header, std::make_shared<ExpressionActions>(std::move(dag)), "notInResult", true);
928921
};
929922
builder.addSimpleTransform(simple_transform_adder);
930923
}
@@ -953,6 +946,26 @@ SinkToStoragePtr IcebergMetadata::write(
953946
}
954947
}
955948

949+
ColumnMapperPtr IcebergMetadata::getColumnMapperForObject(ObjectInfoPtr object_info) const
950+
{
951+
IcebergDataObjectInfo * iceberg_object_info = dynamic_cast<IcebergDataObjectInfo *>(object_info.get());
952+
if (!iceberg_object_info)
953+
return nullptr;
954+
auto configuration_ptr = configuration.lock();
955+
if (Poco::toLower(configuration_ptr->format) != "parquet")
956+
return nullptr;
957+
958+
return persistent_components.schema_processor->getColumnMapperById(iceberg_object_info->underlying_format_read_schema_id);
959+
}
960+
961+
ColumnMapperPtr IcebergMetadata::getColumnMapperForCurrentSchema() const
962+
{
963+
auto configuration_ptr = configuration.lock();
964+
if (Poco::toLower(configuration_ptr->format) != "parquet")
965+
return nullptr;
966+
SharedLockGuard lock(mutex);
967+
return persistent_components.schema_processor->getColumnMapperById(relevant_snapshot_schema_id);
968+
}
956969
}
957970

958971
#endif

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class IcebergMetadata : public IDataLakeMetadata
7979

8080
bool supportsSchemaEvolution() const override { return true; }
8181

82-
static Int32 parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
82+
static Int32 parseTableSchema(
83+
const Poco::JSON::Object::Ptr & metadata_object, Iceberg::IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
8384

8485
bool supportsUpdate() const override { return true; }
8586
bool supportsWrites() const override { return true; }
@@ -91,8 +92,9 @@ class IcebergMetadata : public IDataLakeMetadata
9192
std::optional<size_t> totalRows(ContextPtr Local_context) const override;
9293
std::optional<size_t> totalBytes(ContextPtr Local_context) const override;
9394

94-
ColumnMapperPtr getColumnMapper() const override { return column_mapper; }
95+
ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override;
9596

97+
ColumnMapperPtr getColumnMapperForCurrentSchema() const override;
9698
SinkToStoragePtr write(
9799
SharedHeader sample_block,
98100
const StorageID & table_id,
@@ -143,9 +145,6 @@ class IcebergMetadata : public IDataLakeMetadata
143145
Int64 relevant_snapshot_id TSA_GUARDED_BY(mutex) {-1};
144146
CompressionMethod metadata_compression_method;
145147

146-
147-
ColumnMapperPtr column_mapper;
148-
149148
void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex);
150149
void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex);
151150
void addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object) const TSA_REQUIRES(mutex);

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class ManifestFileContent : public boost::noncopyable
111111
const String & manifest_file_name,
112112
Int32 format_version_,
113113
const String & common_path,
114-
const DB::IcebergSchemaProcessor & schema_processor,
114+
const IcebergSchemaProcessor & schema_processor,
115115
Int64 inherited_sequence_number,
116116
Int64 inherited_snapshot_id,
117117
const std::string & table_location,

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ std::unique_ptr<DB::ActionsDAG> ManifestFilesPruner::transformFilterDagForManife
9696

9797

9898
ManifestFilesPruner::ManifestFilesPruner(
99-
const DB::IcebergSchemaProcessor & schema_processor_,
99+
const IcebergSchemaProcessor & schema_processor_,
100100
Int32 current_schema_id_,
101101
Int32 initial_schema_id_,
102102
const DB::ActionsDAG * filter_dag,

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String &
3030
class ManifestFilesPruner
3131
{
3232
private:
33-
const DB::IcebergSchemaProcessor & schema_processor;
33+
const IcebergSchemaProcessor & schema_processor;
3434
Int32 current_schema_id;
3535
Int32 initial_schema_id;
3636
const DB::KeyDescription * partition_key;
@@ -44,7 +44,7 @@ class ManifestFilesPruner
4444

4545
public:
4646
ManifestFilesPruner(
47-
const DB::IcebergSchemaProcessor & schema_processor_,
47+
const IcebergSchemaProcessor & schema_processor_,
4848
Int32 current_schema_id_,
4949
Int32 initial_schema_id_,
5050
const DB::ActionsDAG * filter_dag,

0 commit comments

Comments
 (0)