Read optimization for Iceberg tables using Iceberg metadata#90001
Read optimization for Iceberg tables using Iceberg metadata#90001ianton-ru wants to merge 6 commits intoClickHouse:masterfrom
Conversation
|
@ianton-ru I understand the optimization, but does it really optimize some real-world workloads? To me it looks like a very rare situation. Even if user has such columns in some files, very likely they will read some other columns in the same query. And in this case benefit will be minuscule. Not really sure it worth some specific and quite complex code. Maybe you can provide some context on it? |
|
@alesapin |
|
Workflow [PR], commit [8513c3a] Summary: ❌
|
Removed conditional compilation for Avro support in IcebergIterator.
divanik
left a comment
There was a problem hiding this comment.
I briefly scimmed the PR and left some comments but the main question is here: #90001 (comment)
| std::vector<Iceberg::ManifestFileEntry> equality_deletes_files; | ||
| std::exception_ptr exception; | ||
| std::mutex exception_mutex; | ||
| Int32 table_schema_id; |
There was a problem hiding this comment.
Could you clarify, why do we need new field here?
There was a problem hiding this comment.
When schema was change, some parquet files can cave old schema, some new.
Say, initially schema had column name, and file1 was created with this schema.
Later column was renamed to old_name, added new column name, and file2 was created with new schema.
But column index stay the same.
Iceberg metadata with min/max values does not contains column names, only indexes.
Query, on the contrary, contains only names, and I need map column name => metadata.
So I made next steps to get correct column metadata:
- get current schema (with this
table_schema_id) - get column index for each column in current schema (again with
table_schema_id) - create map
column name=>metadatainstead ofcolumn index=>metadatafrom Iceberg metadata.
All this in newDataFileMetaInfoconstructor, parameterschema_idis thistable_schema_id.
| /// Object metadata: size, modification time, etc. | ||
| std::optional<ObjectMetadata> metadata; | ||
| /// Information about columns | ||
| std::optional<DataFileMetaInfoPtr> file_meta_info; |
There was a problem hiding this comment.
Do we have any reasons to store field file_meta_info in the class RelativePathWithMetadata? If not, please, move it to ObjectInfo
There was a problem hiding this comment.
Let's move it either to DataLakeObjectMetadata or IcebergDataObject
There was a problem hiding this comment.
No, just historical reasons. Will move into DataLakeObjectMetadata
There was a problem hiding this comment.
Moved to ObjectInfo. data_lake_metadata in ObjectInfo is optional and can absent independently.
| std::unique_ptr<PullingPipelineExecutor> reader; | ||
|
|
||
| public: | ||
| std::map<size_t, ConstColumnWithValue> constant_columns_with_values; |
There was a problem hiding this comment.
At least this field should be constant
There was a problem hiding this comment.
Can be changed in ReaderHolder::operator=
| struct DataFileInfo | ||
| { | ||
| std::string file_path; | ||
| std::optional<DataFileMetaInfoPtr> file_meta_info; | ||
|
|
||
| explicit DataFileInfo(const std::string & file_path_) | ||
| : file_path(file_path_) {} | ||
|
|
||
| explicit DataFileInfo(std::string && file_path_) | ||
| : file_path(std::move(file_path_)) {} | ||
|
|
||
| bool operator==(const DataFileInfo & rhs) const | ||
| { | ||
| return file_path == rhs.file_path; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Seems as an odd abstraction level, let's remove it completely
| constexpr size_t FIELD_MASK_RECT = 0x4; | ||
| constexpr size_t FIELD_MASK_ALL = 0x7; | ||
|
|
||
| void DataFileMetaInfo::serialize(WriteBuffer & out) const |
There was a problem hiding this comment.
You need to do these serialization/deserialization functions versioned. There are two approaches: you either make a separate protocol version for this structure or you inherit protocol version from above protocol. In the latter case you name function as serializeForClusterFunction/deserializeForClusterFunction and take protocol versions from functions above
There was a problem hiding this comment.
Sorry, I don't understand. serialize/deserialize is already called only when protocol version is 5 or greater. Why need check it inside one more time?
| /// Not empty when allow_experimental_iceberg_read_optimization=true | ||
| /// and some columns were removed from read list as columns with constant values. | ||
| /// Restore data for these columns. | ||
| for (const auto & constant_column : reader.constant_columns_with_values) |
| std::map<size_t, ConstColumnWithValue> constant_columns_with_values; | ||
| std::unordered_set<String> constant_columns; | ||
|
|
||
| NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; |
There was a problem hiding this comment.
Make copy later, when you indeed need a copy
There was a problem hiding this comment.
It can be changed in the next block - constant columns are removed from list.
Can rename it to non_constant_requested_columns.
| columns.emplace_back(type->createColumn(), type, name); | ||
| builder.init(Pipe(std::make_shared<ConstChunkGenerator>( | ||
| std::make_shared<const Block>(columns), *num_rows_from_cache, max_block_size))); | ||
| if (!constant_columns.empty()) |
There was a problem hiding this comment.
Could you explain these lines of code?
There was a problem hiding this comment.
Here I remove constant columns from requested list to read only non-constant. Contant columns are restored later in StorageObjectStorageSource::generate() from metadata without reading from source file.
| builder.addSimpleTransform([&](const SharedHeader & header) | ||
| { | ||
| return std::make_shared<ExtractColumnsTransform>(header, read_from_format_info.requested_columns); | ||
| return std::make_shared<ExtractColumnsTransform>(header, requested_columns_copy); |
There was a problem hiding this comment.
I am not sure if it is indeed an optimization. Does this code really ensure that we don't read constant lines from the file?
There was a problem hiding this comment.
Here in requested_columns_copy only non-constant columns, constant column were removed from this list above.
|
Found some major problem, so convert the PR to draft until it is resolved |
|
Feel free to reopen the PR and tag me in DM if the major problem is resolved |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Read optimization for Iceberg tables using Iceberg metadata
Documentation entry for user-facing changes
When Iceberg table is from Iceberg catalog, catalog returns additional metadata about table, including count of rows, count of nulls, min/max values for columns in files.
If some column in some file has zero nulls and min value is equal to max value, we can make a decision that this column has the same value for each rows in this specific file.
That we can avoid reading this column from file and create values based on metadata.
This reduces number of requests to remote storage like S3 or Azure, and as result reduces time on request execution.
Current PR introduces setting
allow_experimental_iceberg_read_optimizationto turn on this optimization.