Skip to content

[GLUTEN-8846][CH] [Part 0] Support reading Iceberg equality delete files#8847

Merged
baibaichen merged 14 commits intoapache:mainfrom
baibaichen:feature/iceberg-mor
Feb 28, 2025
Merged

[GLUTEN-8846][CH] [Part 0] Support reading Iceberg equality delete files#8847
baibaichen merged 14 commits intoapache:mainfrom
baibaichen:feature/iceberg-mor

Conversation

@baibaichen
Copy link
Copy Markdown
Contributor

@baibaichen baibaichen commented Feb 27, 2025

What changes were proposed in this pull request?

Refer facebookincubator/velox#8748 to implement Iceberg equality delete. There are two case:

  1. delete file contains one column, say c1, we generate not in filter, i.e. c1 not in [....].
  2. delete file multiple columns, say c1, c2, c3, for each row we generate not equal filter, i.e. c1 <> x or c2 <> y or c3 <> z, and all row filter are combined using conjunction.

This PR also refactor NormalFileReader, see following digram

                            ┌──────────────┐
                            │  BaseReader  │
                            └──────┬───────┘
                                   │
             ┌────────────────────┴─────────────────────┐
             │                                          │
┌─────────────────────────┐                ┌─────────────────────────┐
│  ConstColumnsFileReader │                │    NormalFileReader     │
└─────────────────────────┘                └───────────┬─────────────┘
                                                       │
                                          ┌────────────┴──────────────┐
                                          │                           │
                                  ┌───────┴───────┐           - - - - ┴- - - - -
                                  │ IcebergReader │           :  DeltaReader  :
                                  └───────────────┘           - - - - - - - - -  

I introduced IcebergReader to implement MOR read,since DeltaReader also need inherit from NormalFileReader, let's merge first.

There are some work need done in the following PR:

  1. Supporting delete columns not in select columns
  2. Improvement: If the deleted value is an integer and continuous, use not (c1>=min and c1<=max)
  3. Benchmark

(Fixes: #8846)

How was this patch tested?

Using Existed UTs and New Gtest

@github-actions
Copy link
Copy Markdown

#8846

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@baibaichen
Copy link
Copy Markdown
Contributor Author

@CodiumAI-Agent /review

@QodoAI-Agent
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis ✅

8846 - Fully compliant

Compliant requirements:

  • Implementation of Iceberg equality delete file reader and related pipelines
  • New unit tests and benchmarks added
⏱️ Estimated effort to review: 5 🔵🔵🔵🔵🔵
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Code Clarity

Consider adding more detailed comments and clarifications in the functions of the EqualityDeleteActionBuilder to explain the logic for constructing predicates from delete files. This will help future maintainers understand the mapping between the delete file content and the constructed filter expressions.

}

void EqualityDeleteActionBuilder::notIn(Block deleteBlock, const std::string & column_name)
{
    assert(deleteBlock.columns() == 1);
    const auto & elem_block = deleteBlock.getByPosition(0);

    const std::string notIn{"notIn"};

    ActionsDAG::NodeRawConstPtrs args;
    const auto & colName = column_name.empty() ? elem_block.name : column_name;
    args.push_back(&actions.findInOutputs(colName));
    PreparedSets prepared_sets;
    FutureSet::Hash emptyKey;
    auto future_set = prepared_sets.addFromTuple(emptyKey, nullptr, {elem_block}, context->getSettingsRef());
    auto arg = ColumnSet::create(1, std::move(future_set));
    args.emplace_back(&actions.addColumn(ColumnWithTypeAndName(std::move(arg), std::make_shared<DataTypeSet>(), "__set")));

    auto function_builder = FunctionFactory::instance().get(notIn, context);
    andArgs.push_back(&addFunction(function_builder, std::move(args)));
}
Loop Behavior

The doPull() method uses a while loop to repeatedly filter out deleted rows. It would be good to validate that this loop will not run indefinitely in scenarios where all rows are filtered or if the filtering predicate might never let any rows pass. Consider adding a safeguard or more comments.

while (true)
{
    Chunk chunk = NormalFileReader::doPull();
    if (chunk.getNumRows() == 0)
        return chunk;

    deleteRows(chunk);

    if (chunk.getNumRows() != 0)
        return chunk;
}
Schema Casting

The newly added castBlock function adjusts the input block to match the preferred schema. Please verify that this casting handles all edge cases (especially with virtual columns) and that no type mismatches can occur during writing.

DB::Block NormalFileWriter::castBlock(const DB::Block & block) const
{
    if (!block)
        return block;

    Block res = block;

    /// In case input block didn't have the same types as the preferred schema, we cast the input block to the preferred schema.
    /// Notice that preferred_schema is the actual file schema, which is also the data schema of current inserted table.

@baibaichen
Copy link
Copy Markdown
Contributor Author

@CodiumAI-Agent /improve

@QodoAI-Agent
Copy link
Copy Markdown

QodoAI-Agent commented Feb 28, 2025

PR Code Suggestions ✨

Latest suggestions up to 7528202

CategorySuggestion                                                                                                                                    Impact
General
Validate const correctness

Ensure that changing the write method signature to use a const DB::Block & is
supported by all implementations and, if mutations are needed, make an explicit
mutable copy.

cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h [61]

+// Optionally, create a non-const copy if in-place modifications are required.
 void write(const DB::Block & block) override;
Suggestion importance[1-10]: 4

__

Why: The suggestion advises ensuring that using a const reference in the write method is supported in all implementations, which is useful though not critical; it prompts further review if mutations are needed.

Low
Verify exception handling

Review the new exception handling in the Arrow utility, specifically ensuring that
the exception codes for out-of-memory versus incorrect data are correctly determined
from Arrow's status.

cpp-ch/local-engine/Storages/Parquet/ArrowUtils.h [38-45]

+if (result.ok())
+    return std::move(result).ValueUnsafe();
+throw DB::Exception::createDeprecated(
+    result.status().ToString(),
+    result.status().IsOutOfMemory() ? DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY : DB::ErrorCodes::INCORRECT_DATA);
 
-
Suggestion importance[1-10]: 4

__

Why: The suggestion encourages a review of the new exception handling logic for correctness, ensuring the proper exception codes are used, but it only prompts validation rather than providing a concrete fix.

Low
Confirm row group filtering

Verify that the removal of the row group filter lambda in the new call to build is
intentional and that all necessary row group filtering logic is still applied.

cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp [1136]

+metaBuilder.build(in, blockHeader, column_index_filter.get());
 
-
Suggestion importance[1-10]: 3

__

Why: This suggestion is a reminder to verify that the removal of the row group filter lambda was intentional; it doesn't propose any concrete changes and has minimal impact on functionality.

Low

Previous suggestions

Suggestions up to commit 7528202
CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle empty filtered chunk

In deleteRows, explicitly update the chunk to an empty block when no rows pass the
filter instead of simply returning.

cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp [157-158]

 if (num_filtered_rows == 0)
+{
+    removeFilterIfNeed(columns, filter_column_position);
+    chunk.setColumns({}, 0);
     return;
+}
Suggestion importance[1-10]: 8

__

Why: Instead of silently returning when no rows pass the filter, the proposed change explicitly resets the chunk to an empty block, which prevents potential inconsistencies and ensures the chunk is properly updated.

Medium
Ensure safe vector access

Add a boundary check when using column_names[j] in the notEquals method to prevent
potential out-of-bound access.

cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp [137]

-const auto & colName = column_names.empty() ? column.name : column_names[j];
+const auto & colName = (!column_names.empty() && j < column_names.size()) ? column_names[j] : column.name;
Suggestion importance[1-10]: 6

__

Why: The suggestion adds a boundary check to prevent out-of-bound access for 'column_names[j]', which improves safety though the invariant might already guarantee the size; this adds defensive programming without major impact.

Low
Check for empty block explicitly

Replace the implicit block validity check with an explicit emptiness check to avoid
potential undefined behavior.

cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp [161-162]

-if (!block)
+if (block.empty())
     return block;
Suggestion importance[1-10]: 4

__

Why: The suggestion replaces an implicit validity check with an explicit emptiness check, which can improve clarity. However, since DB::Block might intentionally overload operator bool(), this change is only a minor improvement.

Low
General
Refine URI decoding logic

Review the URI decoding logic in the DateTime64 conversion to ensure that
double-encoded values are handled reliably.

cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp [150-153]

-if (decoded.length() > 23)
+if (needsDoubleDecoding(decoded))  // determine with a dedicated check
     Poco::URI::decode(decoded, to_read);
 else
     to_read = decoded;
Suggestion importance[1-10]: 3

__

Why: The suggestion proposes to replace a length-based check with a dedicated function to decide double-decoding, but it is vague and introduces an undefined helper, reducing its actionable value.

Low
Suggestions up to commit b14cfe7
CategorySuggestion                                                                                                                                    Impact
Possible issue
Proper filter index initialization

Avoid initializing a size_t index with -1; initialize it properly to prevent
unexpected overflow.

cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp [112]

-size_t filter_column_position = -1;
+size_t filter_column_position;
+{
+    Block block = readHeader.cloneWithColumns(columns);
+    columns.clear();
+    delete_expr->execute(block, num_rows_before_filtration);
+    filter_column_position = block.getPositionByName(delete_expr_column_name);
+}
Suggestion importance[1-10]: 7

__

Why: Replacing "size_t filter_column_position = -1;" with an uninitialized declaration that is later assigned avoids undefined behavior due to unsigned overflow and makes the intent clearer.

Medium
null check before use

Add a null-pointer check in the new toString function to guard against potential
dereference of a null col.

cpp-ch/local-engine/Common/DebugUtils.cpp [146-165]

 static std::string toString(const DB::IColumn * const col, size_t row, size_t width)
 {
+    if (!col)
+        return "null";
     auto getDataType = [](const DB::IColumn * col)
     {
         if (const auto * column_nullable = DB::checkAndGetColumn<DB::ColumnNullable>(col))
             return column_nullable->getNestedColumn().getDataType();
         …
     };
Suggestion importance[1-10]: 5

__

Why: The suggestion adds a defensive null-pointer check to prevent potential crashes, which is useful though the overall impact is moderate since callers may already guarantee a non-null pointer.

Low
Implement destructor cleanup

Ensure that the destructor (~SubstraitFileSource) is properly implemented to release
resources and avoid potential memory leaks.

cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h [36]

-~SubstraitFileSource() override;
+~SubstraitFileSource() override { /* Implement proper resource cleanup */ }
Suggestion importance[1-10]: 5

__

Why: Providing an implementation for the destructor to handle resource cleanup could prevent potential memory leaks, offering moderate improvement provided that the actual cleanup is necessary.

Low
Use unsigned loop indices

Change loop indices from int to size_t in the notEquals method to avoid
signed/unsigned mismatch and potential issues.

cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp [127-133]

-for (int i = 0; i < numDeletedValues; i++)
+for (size_t i = 0; i < numDeletedValues; i++)
 {
-    for (int j = 0; j < numDeleteFields; j++)
+    for (size_t j = 0; j < numDeleteFields; j++)
     {
         // processing logic
     }
 }
Suggestion importance[1-10]: 4

__

Why: Changing the loop index types from int to size_t prevents potential signed/unsigned mismatch; however, the impact is minor and addresses mostly a warning rather than a critical bug.

Low
Verify type mapping conversion

Verify that the updated lambda using toColumnType provides the correct type mapping
as expected compared to the previous use of toBlockFieldType.

cpp-ch/local-engine/tests/utils/gluten_test_util.cpp [55]

-[](const auto & name_and_type) { return std::make_pair(name_and_type.name, toColumnType(name_and_type)); }
+[](const auto & name_and_type) { return std::make_pair(name_and_type.name, toColumnType(name_and_type)); } // Confirm mapping semantics
Suggestion importance[1-10]: 3

__

Why: The suggestion only adds a clarifying comment to confirm the semantics of the type mapping change without modifying functionality, resulting in minimal impact.

Low
General
Use safer copy for vector

Consider replacing memcpy with std::copy or equivalent vector assignment to safely
copy values into the container.

cpp-ch/local-engine/Common/BlockTypeUtils.h [164]

-memcpy(vec.data(), data.data(), data.size() * sizeof(T));
+std::copy(data.begin(), data.end(), vec.begin());
Suggestion importance[1-10]: 6

__

Why: Replacing memcpy with std::copy improves safety and readability when copying vector elements, which is a useful enhancement without altering functional behavior.

Low
robust numeric conversion

Enhance error handling in buildFieldFromString to validate numeric string inputs and
handle conversion failures explicitly.

cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp [114-200]

 DB::Field BaseReader::buildFieldFromString(const String & str_value, DB::DataTypePtr type)
 {
     …
     DB::ReadBufferFromString read_buffer(str_value);
     auto it = field_builders.find(nested_type->getName());
     if (it == field_builders.end())
     {
         …
         throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported data type {}", nested_type->getName());
     }
-    return it->second(read_buffer, str_value);
+    try
+    {
+        return it->second(read_buffer, str_value);
+    }
+    catch (const std::exception & e)
+    {
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Error converting '{}' for type {}: {}", str_value, nested_type->getName(), e.what());
+    }
 }
Suggestion importance[1-10]: 4

__

Why: Wrapping the conversion call in a try-catch improves error diagnostics for conversion failures, but the change is not critical and may be seen as an optional robustness enhancement.

Low
reset read buffer position

Ensure the state of the provided read_buffer is reset or validated after it is used
in openInputParquetFile to prevent unexpected read positions.

cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp [41-49]

 std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquetFile(DB::ReadBuffer & read_buffer)
 {
     const DB::FormatSettings format_settings{
         .seekable_read = true,
     };
     std::atomic<int> is_stopped{0};
     auto arrow_file = asArrowFile(read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
+    // Reset the read buffer position if required.
+    read_buffer.seek(0, SEEK_SET);
     return parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), nullptr);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to reset the read buffer position could prevent unexpected read positions; however, it may be unnecessary depending on downstream usage, making the improvement useful but of lower impact.

Low

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@baibaichen baibaichen changed the title [GLUTEN-8846][CH] [Part1] Support reading Iceberg equality delete files [GLUTEN-8846][CH] [Part 0] Support reading Iceberg equality delete files Feb 28, 2025
@baibaichen baibaichen marked this pull request as ready for review February 28, 2025 03:30
Copy link
Copy Markdown
Contributor

@zzcclp zzcclp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@baibaichen baibaichen merged commit ef19d63 into apache:main Feb 28, 2025
16 checks passed
@baibaichen baibaichen deleted the feature/iceberg-mor branch February 28, 2025 05:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[CH][UMBRELLA] Support Iceberg MOR

3 participants