66#include < cstddef>
77#include < memory>
88#include < optional>
9- #include < Formats/FormatFilterInfo.h>
10- #include < Formats/FormatParserSharedResources.h>
11- #include < Processors/Formats/Impl/ParquetBlockInputFormat.h>
12- #include < Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
13- #include < Poco/JSON/Array.h>
14- #include < Poco/JSON/Object.h>
15- #include < Poco/JSON/Stringifier.h>
16- #include < Common/Exception.h>
179#include < Columns/ColumnSet.h>
1810#include < DataTypes/DataTypeSet.h>
11+ #include < Formats/FormatFilterInfo.h>
12+ #include < Formats/FormatParserSharedResources.h>
1913#include < Formats/ReadSchemaUtils.h>
2014#include < Functions/FunctionFactory.h>
2115#include < Functions/IFunctionAdaptors.h>
2216#include < Functions/tuple.h>
2317#include < Processors/Formats/ISchemaReader.h>
18+ #include < Processors/Formats/Impl/ParquetBlockInputFormat.h>
2419#include < Processors/Transforms/FilterTransform.h>
2520#include < QueryPipeline/QueryPipelineBuilder.h>
21+ #include < Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
22+ #include < Poco/JSON/Array.h>
23+ #include < Poco/JSON/Object.h>
24+ #include < Poco/JSON/Stringifier.h>
25+ #include < Common/Exception.h>
2626
2727#include < Databases/DataLake/Common.h>
2828#include < Core/Settings.h>
4343#include < Interpreters/ExpressionActions.h>
4444#include < IO/CompressedReadBufferWrapper.h>
4545
46+ #include < Disks/ObjectStorages/IObjectStorage.h>
47+ #include < Interpreters/StorageID.h>
4648#include < Storages/ColumnsDescription.h>
4749#include < Storages/ObjectStorage/DataLakes/Iceberg/AvroForIcebergDeserializer.h>
50+ #include < Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h>
51+ #include < Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
4852#include < Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
4953#include < Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h>
5054#include < Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
55+ #include < Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
5156#include < Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
5257#include < Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
58+ #include < Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
5359#include < Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
5460#include < Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
5561#include < Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h>
5662#include < Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
5763#include < Storages/ObjectStorage/Utils.h>
58- #include < Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
59- #include < Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
60- #include < Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h>
61- #include < Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h>
62- #include < Disks/ObjectStorages/IObjectStorage.h>
63- #include < Interpreters/StorageID.h>
6464
6565#include < Common/ProfileEvents.h>
6666#include < Common/SharedLockGuard.h>
@@ -309,26 +309,6 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
309309 }
310310 }
311311
312- #if USE_PARQUET
313- if (configuration_ptr->format == " Parquet" )
314- column_mapper = std::make_shared<ColumnMapper>();
315-
316- if (column_mapper)
317- {
318- Int32 schema_id = snapshot->getValue <Int32>(f_schema_id);
319- std::unordered_map<String, Int64> column_name_to_parquet_field_id;
320- for (UInt32 j = 0 ; j < schemas->size (); ++j)
321- {
322- auto schema = schemas->getObject (j);
323- if (schema->getValue <Int32>(f_schema_id) != schema_id)
324- continue ;
325-
326- column_name_to_parquet_field_id = IcebergSchemaProcessor::traverseSchema (schema->getArray (Iceberg::f_fields));
327- }
328- column_mapper->setStorageColumnEncoding (std::move (column_name_to_parquet_field_id));
329- }
330- #endif
331-
332312 relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
333313 getManifestList (
334314 object_storage,
@@ -448,7 +428,8 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
448428 if (!iceberg_object_info)
449429 return nullptr ;
450430 // / if we need schema evolution or have equality deletes files, we need to read all the columns.
451- return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id) || (!iceberg_object_info->equality_deletes_objects .empty ())
431+ return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id)
432+ || (!iceberg_object_info->equality_deletes_objects .empty ())
452433 ? persistent_components.schema_processor ->getClickhouseTableSchemaById (iceberg_object_info->underlying_format_read_schema_id )
453434 : nullptr ;
454435}
@@ -497,9 +478,9 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands)
497478{
498479 for (const auto & command : commands)
499480 {
500- if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN && command. type != AlterCommand::Type::MODIFY_COLUMN)
501- throw Exception (ErrorCodes::NOT_IMPLEMENTED, " Alter of type '{}' is not supported by Iceberg storage " ,
502- command.type );
481+ if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN
482+ && command. type != AlterCommand::Type::MODIFY_COLUMN)
483+ throw Exception (ErrorCodes::NOT_IMPLEMENTED, " Alter of type '{}' is not supported by Iceberg storage " , command.type );
503484 }
504485}
505486
@@ -826,18 +807,21 @@ std::tuple<Int64, Int32> IcebergMetadata::getVersion() const
826807 return std::make_tuple (relevant_snapshot_id, relevant_snapshot_schema_id);
827808}
828809
829- void IcebergMetadata::addDeleteTransformers (ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional<FormatSettings> & format_settings, ContextPtr local_context) const
810+ void IcebergMetadata::addDeleteTransformers (
811+ ObjectInfoPtr object_info,
812+ QueryPipelineBuilder & builder,
813+ const std::optional<FormatSettings> & format_settings,
814+ ContextPtr local_context) const
830815{
831816 auto iceberg_object_info = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object_info);
832817 if (!iceberg_object_info)
833818 return ;
834819
835820 if (!iceberg_object_info->position_deletes_objects .empty ())
836821 {
837- builder.addSimpleTransform ([&](const SharedHeader & header)
838- {
839- return iceberg_object_info->getPositionDeleteTransformer (object_storage, header, format_settings, local_context);
840- });
822+ builder.addSimpleTransform (
823+ [&](const SharedHeader & header)
824+ { return iceberg_object_info->getPositionDeleteTransformer (object_storage, header, format_settings, local_context); });
841825 }
842826 const auto & delete_files = iceberg_object_info->equality_deletes_objects ;
843827 LOG_DEBUG (log, " Constructing filter transform for equality delete, there are {} delete files" , delete_files.size ());
@@ -865,15 +849,26 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
865849 std::vector<size_t > equality_indexes_delete_file;
866850 for (Int32 col_id : equality_ids)
867851 {
868- NameAndTypePair name_and_type = persistent_components.schema_processor ->getFieldCharacteristics (delete_file.schema_id , col_id);
852+ NameAndTypePair name_and_type
853+ = persistent_components.schema_processor ->getFieldCharacteristics (delete_file.schema_id , col_id);
869854 block_for_set.insert (ColumnWithTypeAndName (name_and_type.type , name_and_type.name ));
870855 equality_indexes_delete_file.push_back (delete_file_header.getPositionByName (name_and_type.name ));
871856 }
872857 // / Then we read the content of the delete file.
873858 auto mutable_columns_for_set = block_for_set.cloneEmptyColumns ();
874859 std::unique_ptr<ReadBuffer> data_read_buffer = createReadBuffer (delete_file_object, object_storage, local_context, log);
875860 CompressionMethod compression_method = chooseCompressionMethod (delete_file.file_path , " auto" );
876- auto delete_format = FormatFactory::instance ().getInput (delete_file.file_format , *data_read_buffer, delete_file_header, local_context, local_context->getSettingsRef ()[DB::Setting::max_block_size], format_settings, nullptr , nullptr , true , compression_method);
861+ auto delete_format = FormatFactory::instance ().getInput (
862+ delete_file.file_format ,
863+ *data_read_buffer,
864+ delete_file_header,
865+ local_context,
866+ local_context->getSettingsRef ()[DB::Setting::max_block_size],
867+ format_settings,
868+ nullptr ,
869+ nullptr ,
870+ true ,
871+ compression_method);
877872 // / only get the delete columns and construct a set by 'block_for_set'
878873 while (true )
879874 {
@@ -890,8 +885,10 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
890885 block_for_set.setColumns (std::move (mutable_columns_for_set));
891886 // / we are constructing a 'not in' expression
892887 const auto & settings = local_context->getSettingsRef ();
893- SizeLimits size_limits_for_set = {settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]};
894- FutureSetPtr future_set = std::make_shared<FutureSetFromTuple>(CityHash_v1_0_2::uint128 (), nullptr , block_for_set.getColumnsWithTypeAndName (), true , size_limits_for_set);
888+ SizeLimits size_limits_for_set
889+ = {settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]};
890+ FutureSetPtr future_set = std::make_shared<FutureSetFromTuple>(
891+ CityHash_v1_0_2::uint128 (), nullptr , block_for_set.getColumnsWithTypeAndName (), true , size_limits_for_set);
895892 ColumnPtr set_col = ColumnSet::create (1 , future_set);
896893 ActionsDAG dag (header->getColumnsWithTypeAndName ());
897894 // / Construct right argument of 'not in' expression, it is the column set.
@@ -904,27 +901,23 @@ void IcebergMetadata::addDeleteTransformers(ObjectInfoPtr object_info, QueryPipe
904901 // / select columns to use in 'notIn' function
905902 for (Int32 col_id : equality_ids)
906903 {
907- NameAndTypePair name_and_type = persistent_components.schema_processor ->getFieldCharacteristics (iceberg_object_info->underlying_format_read_schema_id , col_id);
904+ NameAndTypePair name_and_type = persistent_components.schema_processor ->getFieldCharacteristics (
905+ iceberg_object_info->underlying_format_read_schema_id , col_id);
908906 auto it = outputs.find (name_and_type.name );
909907 if (it == outputs.end ())
910908 throw Exception (ErrorCodes::LOGICAL_ERROR, " Cannot find column {} in dag outputs" , name_and_type.name );
911909 left_columns.push_back (it->second );
912910 }
913- FunctionOverloadResolverPtr func_tuple_builder =
914- std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionTuple>());
915- const ActionsDAG::Node * in_lhs_arg = left_columns.size () == 1 ?
916- left_columns.front () :
917- &dag.addFunction (func_tuple_builder, std::move (left_columns), {});
911+ FunctionOverloadResolverPtr func_tuple_builder
912+ = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionTuple>());
913+ const ActionsDAG::Node * in_lhs_arg
914+ = left_columns.size () == 1 ? left_columns.front () : &dag.addFunction (func_tuple_builder, std::move (left_columns), {});
918915 // / we got the NOT IN function
919916 auto func_not_in = FunctionFactory::instance ().get (" notIn" , nullptr );
920917 const auto & not_in_node = dag.addFunction (func_not_in, {in_lhs_arg, in_rhs_arg}, " notInResult" );
921918 dag.getOutputs ().push_back (¬_in_node);
922919 LOG_DEBUG (log, " Use expression {} in equality deletes" , dag.dumpDAG ());
923- return std::make_shared<FilterTransform>(
924- header,
925- std::make_shared<ExpressionActions>(std::move (dag)),
926- " notInResult" ,
927- true );
920+ return std::make_shared<FilterTransform>(header, std::make_shared<ExpressionActions>(std::move (dag)), " notInResult" , true );
928921 };
929922 builder.addSimpleTransform (simple_transform_adder);
930923 }
@@ -953,6 +946,26 @@ SinkToStoragePtr IcebergMetadata::write(
953946 }
954947}
955948
949+ ColumnMapperPtr IcebergMetadata::getColumnMapperForObject (ObjectInfoPtr object_info) const
950+ {
951+ IcebergDataObjectInfo * iceberg_object_info = dynamic_cast <IcebergDataObjectInfo *>(object_info.get ());
952+ if (!iceberg_object_info)
953+ return nullptr ;
954+ auto configuration_ptr = configuration.lock ();
955+ if (Poco::toLower (configuration_ptr->format ) != " parquet" )
956+ return nullptr ;
957+
958+ return persistent_components.schema_processor ->getColumnMapperById (iceberg_object_info->underlying_format_read_schema_id );
959+ }
960+
961+ ColumnMapperPtr IcebergMetadata::getColumnMapperForCurrentSchema () const
962+ {
963+ auto configuration_ptr = configuration.lock ();
964+ if (Poco::toLower (configuration_ptr->format ) != " parquet" )
965+ return nullptr ;
966+ SharedLockGuard lock (mutex);
967+ return persistent_components.schema_processor ->getColumnMapperById (relevant_snapshot_schema_id);
968+ }
956969}
957970
958971#endif
0 commit comments