Skip to content

Support reading Iceberg equality delete files (Design) #8748

@yingsu00

Description

@yingsu00

Description

In https://github.com/facebookincubator/velox/pull/7847we introduced IcebergSplitReader and the support of reading positional delete files. In this doc we will discuss the implementation of reading equality delete files.

Iceberg Equality Deletes Overview

A general introduction of equality delete files can be found at https://iceberg.apache.org/spec/#equality-delete-files. Some key takeaways:

  1. An equality delete file can contain multiple fields(could be sub-fields), and the values for the fields in the same row are in AND relationship. E.g. The following equality delete file
equality_ids=[1, 3]
1: id  | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

means:

    - A row is deleted if (id = 3 AND name = 'Grizzly') is true. Or
    - A row is selected if (id <> 3 OR name <> 'Grizzly') is true
  1. The equality delete field value could be NULL, which means a row is deleted if that field is NULL.
equality_ids=[2]
1: id  | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

The expression specifies:

    - A row is deleted if category IS NULL. Or
    - A row is selected if category IS NOT NULL
  1. An equality delete file could contain multiple rows.
equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
3      |    NULL     | Grizzly
5      |    Bear     | Polar

means:

    - A row is deleted if (id = 3 AND name = 'Grizzly') OR (id = 5 AND name = 'Polar')  is true. Or
    - A row is selected if (id <> 3 OR name <> 'Grizzly') AND (id <> 5 OR name <> 'Polar')  is true
  1. A split can contain multiple equality or positional delete files, and a row is deleted if any row expression in these delete files is true. E.g. a split may come with 3 delete files:

Equality delete file 1

equality_ids=[1, 3]
 1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

Equality delete file 2

equality_ids=[3]
`1: id | 2: category | 3: name
-------|-------------|---------
 1     | NULL        | Polar 

Positional delete file 1

	100
	101

means

    - a row is deleted iff (id = 3 AND name = 'Grizzly') OR (name = 'Polar') OR (row_in_file = 100)  OR (row_in_file = 101)  
    - a row is selected iff (id <> 3 OR name <> 'Grizzly') AND (name <> 'Polar')  AND(row_in_file <>100)  AND (row_in_file <> 100) 
  1. A split can contain many equality and positional delete files.

Design considerations

Build Hash Tables or Filters/FilterFunctions?

The equality delete files can be interpreted as logical expressions and become the remaining filters that can be evaluated after all rows in a batch is read out into the result vectors. Or alternatively, they can be used to construct a number of hash tables that will be probed against after all rows are read into the result vectors. Suppose the equality delete file contains the following information:

equality_ids=[2, 3]
 1: id | 2: category | 3: name
-------|-------------|---------
 1      |   Bear       | Grizzly
 3      |   Bear      | Brown

It means

- a row is deleted iff (category = 'Bear' AND name = 'Grizzly') OR (category = 'Bear' AND name = 'Polar')
- a row is selected iff (category <> 'Bear' OR name <> 'Grizzly') AND (category <> 'Bear' OR name <> 'Polar')  

To build the hash tables, we will need to concatenate the hash values of column 2 and 3 together, and the hash table will contain two hash values for 'Bear##Grizzly' and 'Bear##Brown'. Then in the matching phase, the hash values of column 2 and 3 for all the rows in the output RowVectors would be calculated and concatenated before probing the hash table. If it's not a match, it means this row was definitely not deleted; if it is a match, then the row needs to be compared with the original delete values to confirm if it's really deleted. Only when all the values are the same the row shall be confirmed to have been removed. Note that the final comparison is necessary, because there is still a very small possibility that hash probe collision could happen, especially when there are many columns involved.

Note that creating hash tables on single columns is not correct without additional processing. For example, suppose the base file is as follows:

1: id | 2: category | 3: name
-------|-------------|---------
 1       |   Bear       | Grizzly
 2      |   Bear       | Brown
 3      |   Bear       | Polar
 4      |   Dog        | Brown

If we build one hash table on the second column "category" that contains {'Bear'}, and another hash table on name that contains {'Grizzly', 'Brown'}, then probing the category hash table to exclude rows with category = 'Bear' would incorrectly remove row 3, probing the name hash table to exclude 'Grizzly' and 'Brown' would incorrectly remove row 4. Taking logical AND or OR on the two probe results is also incorrect.

Now let's take one step back, and build the hashtables on single values. So we have hashtable A on "category" with one value 'Bear', and another hash table B on "name" with value "Grizzly" and another hash table C on "name" with value "Brown", then a row would pass if (category <> 'Bear' OR name <> 'Grizzly') AND (category <> 'Bear' OR name <> 'Polar') by probing hash table A twice, and hash table B and C once, then compute the logical ORs and ANDs. However, this is no difference than just comparing the values and no hash tables are actually needed.

The other way is to compile these delete values into logical expressions that can be executed as the remaining filter functions, or even domain filters that can be pushed down to the base file reader. This can be more efficient than taking the hash table approach. Firstly, filter pushdown can eliminate a lot of decoding costs; Secondly, computing and concatenating the hash values for all rows are very expensive. In fact, it is much slower than just performing simple comparisons on single column values. The latter could be efficiently done by SIMD operations, while the hash value computation cannot be efficiently implemented using SIMD. And lastly, the values need to be compared anyways even for the hash table approach.

We should also notice that if we convert them into logical expressions as remaining filters, the existing Velox expression evaluation implementation can automatically choose the best evaulation strategy, e.g. whether to build hash tables, or do efficient SIMD logical comparisons when it sees fit. This is much more flexible than building fixed hash tables in the connector DataSources. In many cases, the ExpressionEvaluator can choose more efficient way to evaluate the equivalent expressions. Today, it's already very easy to construct the logical expressions, and for the equality delete to work, there is no additional code needed beyond the expression constructions what's so ever. The existing Velox data source and readers can already handle them, so the implementation would be fairly simple. Plus, we can additionally improve existing filter function / expression evaluation implementations that can potentially benefit other components of the driver pipeline in the future. So we propose to choose the remaining filter path and just convert the equality delete files into filters and filter functions.

Where and How to open the equality delete files

Query engines like Presto or Spark usually have some central coordinators, where the distributed plan and splits are created. The splits would then be sent to the workers and executed there using the Velox stack. A query may issue many splits for each table, and each of them may include many (may be up to hundreds) delete files. We have the choice to open the delete files in the coordinator, and/or in the workers. There are some basic design considerations here:

Native workers running Velox need to have the ability to read both equality delete and positional delete files.

  • Although it's possible for Prestissimo to open the (equality) delete files on the coordinator(s), we cannot assume other engines can process the equality delete files internally by themselves. The Iceberg splits come with a list of delete file paths and they could be positional or equality or both. With the normal scheduling implementations in most engines, the splits would be directly sent to the workers for local executions. An engine would need a fairly large amount of special handling to break this procedure and open the delete files, update the distributed query plan, creating filters that can be pushed down, or even change the scan into a join, etc. before sending out the splits. This makes the engines logic much more complex and the integration with Velox much harder.

  • Opening files is a very expensive operation. Opening all delete files on the coordinator may become the bottleneck of the system. Even though we could cache the parsed expressions from each distinct delete file, opening hundreds of them on a single or a small number of coordinator nodes is still not practical.

Based on these considerations, I think we need to implement reading equality deletes in Velox. However it doesn't mean we cannot open some of the equality delete files on the coordinator for optimization purpose. But that optimization should not be mandatory for the engines built on top of Velox.

Performance Considerations

We want to push down the filters as much, and as deep as possible.

By pushing down the filters to the readers or even decoder's level, we can efficiently avoid the costs of decoding skipped rows, or even save some decompression costs. This savings could be huge if the selectivity rate is very small. We shall notice that some of the equality delete files and all positional delete files could be converted to TupleDomain filters or initial row numbers that can be pushed to the readers. In order to achieve this, we will need to extract the parts that can be pushed down, and guarantee the rest parts are evaluated or tested correctly.

We want to avoid opening the delete files as much as possible

A split may include hundreds of delete files, and a worker could receive many splits with the same set of delete files. Ideally, each delete file should be opened only once on one worker. This is because 1) opening files is expensive 2) expression compilation, or building hashtables that can be later probed are also not cheap. There're a couple of ways to achieve this

  • Building a hash table for each HiveDataSource, or a long living cache on the compiled expressions on each node

  • Convert the scan with equality delete files to a broadcast join, with the delete files becoming one data source, and the base file becoming another data source. This shows good improvements in Presto, but it also misses the opportunities of

    • filter pushdown for reading the base file,
    • more efficient evaluation comparing to hash joins
    • Cross file filters and filter functions merge
    • ExpressionEvaluator's ability to extract common sub-expressions
    • logical expression simplifications
    • Simpler plan shape, less data movement
    • No additional data size limit for broadcast joins

We want to reduce the amount of expression evaluation as much as possible.

We have shown that the equality delete files can be interpreted as some conjunctive logical expressions. However, logical expression evaluations are also expensive when the expression contains many terms. We notice that Velox can already extract common sub expressions and flatten adjacent logical expressions, but the more general logical expression simplifications is still not implemented. Nonetheless, there are some ways to simplify the expressions for some simple cases for Iceberg. We will discuss them later.

Design

EqualityDeleteFileReader

We will be introducing the EqualityDeleteFileReader class, and each reader is responsible for opening one equality delete file. The content will be read in batches, and for each batch, the logical expressions will be built and merged with existing remainingFilter in the HiveDataSource.

The equality delete file schema is not fixed and can only be known at query run time. The equality Ids and the base file schema are used together to get the output row type and build the ScanSpec for the equality delete file. The equality Ids are the same as the id in Velox TypeWithId, and therefore we can directly use dwio::common::typeutils::buildSelectedType() to get the delete file schema. Note that this Id is not necessarily from a primitive type column, but could also be a sub-field from a complex type column. For example, deleting from an ARRAY[INTEGER] column c where c[i]=5 can also be expressed as an equality delete file. The field Ids for this column is

0: root
1: ARRAY
2: INTEGER

Therefore the equality id for this predicate is 2, and the content of the equality delete file is value 5.

Once we read the delete values, we can build the ExprSet and add it to the existing remainingFilterExprSet_ in HiveDataSource. Then the expresionEvaluator_ in HiveDataSource will evaluate them after all relevant vectors are loaded. There are two ways to add the newly created ExprSet to the existing remainingFilterExprSet:

  1. By conjuncting with the Expr in remainingFilterExprSet_
  2. By adding the Expr to the array in remainingFilterExprSet_

Note that the current HiveDataSource assumes remainingFilterExprSet_ has only one Expr, and the owned SimpleExpressionEvaluator only evaluates the first Expr in an ExprSet. There're a couple of facts that we discover:

  1. SimpleExpressionEvaluator is only used in TableScan and HiveConnector
  2. The remainingFilterExprSet_ would always be a special kind of ExprSet that it only contains logical expressions.

While I think SimpleExpressionEvaluator should indeed evaluate all Exprs in the passed in ExprSet, I think we can alternative create a new LogicalExpressionEvaluator, in which we can have special logical expression evaluation improvements in the future. Then it seems that adding the new Expr to remainingFilterExprSet_ as an array element is the most clean and simple way.

Extraction of domain filters

When the equality delete file only has one field, we can extract it as a domain filter. Such filter can be pushed down to the readers and decoders, where performance savings could happen. In this case we will create a NOT IN filter for it. This is done in connector::hive::iceberg::FilterUtil::createNotInFilter(), which in turn would call into the utility functions in common:Filter.h/cpp. The values will be de-duplicated and nulls will be treated separately.

equality_ids=[2]
1: id | 2: category | 3: name
-------|-------------|---------
 1     | cat         | ShortHair
 2     | cat         | NorwegianForest
 3     | NULL        | Grizzly
 3     | dog        | FrenchBull

Would be interpreted as

    - a row is deleted iff category IN ('cat',  'dog') OR category is NULL
    - a row is selected iff category NOT IN ('cat',  'dog') AND category is NOT NULL

Velox can materialize it into different kinds of filters, e.g. a NegatedBigintRange filter when there is only one value, or a NegatedBigintValuesUsingBitmask that uses bitmasks when there are a few values.

Note that we need to verify the field is not a sub-field, since Velox currently doesn't support pushing down filters to sub-fields. This restriction will be removed once Velox supports sub-field filter pushdowns.

An equality delete file with multiple fields cannot be pushed down as domain filters at this moment, no matter if there's a single row or multiple rows. E.g. this delete file can be interpreted as id <> 3 || name <> 'Grizzly'. Currently Velox does not support pushing down disjunctives but we may do it in the future.

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

Domain Filter Merge

A split may come with multiple equality delete files. Some of them may have the same schema. If they all have the same single field, the extracted domain filters will be deduped and merged with the existing one. E.g.
Equality delete file 1

equality_ids=[2]
2: category 
---------------
    mouse

Equality delete file 2

equality_ids=[2]
2: category 
---------------
    bear
   mouse

The domain filter built from these 2 files will be category NOT IN {'bear', 'mouse'} This is using the mergeWith api in the Filter class.

Remaining Filter Function Merge

If the equality delete files have the same schema but not the single field, For example
Equality delete file 1

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Winnie

Equality delete file 2

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 4     | NULL        | Micky
3     | NULL        | Winnie

This will create 2 Expr in the final ExprSet:

 - (`id <> 3 || name <> 'Winnie')
 - (`id <> 3 || name <> 'Winnie') && (`id <> 4 || name <> 'Micky')

Today Velox supports common sub-expressions recognition in the ExpressionEvaluator, and such expression would be evaluated only once. In this example (`id <> 3 || name <> 'Winnie') evaluation result would be cached internally and does not need to be evaluated twice.

Logical Expression Simplification

As far as I understand, Velox can do logical expression flattening, but still can't automatically simplify the logical expression. For example, the expression a AND (b AND (c AND d)) would be flattened as AND(a,b,c,d), but a AND (a OR b) cannot be automatically simplified to a, therefore to evaluate a AND (a OR b), a and b will both be evaluated, and one AND and one OR operation need to be performed. While we hope to improve logical expression simplification in the future, we can still do some simple improvements for Iceberg now.

An Iceberg split can come with multiple equality delete files and their schemas could have overlaps. For example
Equality delete file 1

equality_ids=[1, 2, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 1      |   mouse   | Micky
 2      |   mouse   | Minnie
 3      |     bear     | Winnie
 4      |     bear     | Betty

Equality delete file 2

equality_ids=[2]
2: category 
---------------
   mouse

Equality delete file 3

equality_ids=[2, 3]
2: category  | 3: name
----------------|-------------
   bear           | Winnie

We see that equality delete file 2 is on the category column and would remove all tuples with value mouse. This means that the first two rows in equality delete file 1 are already contained and doesn’t need to be read or compiled. Similarly, the single row in file 3 contains row 3 in file 1, therefore row 3 in file 1 doesn’t need to be read or compiled. The simplified delete files are like the follows:

equality_ids=[1, 2, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 4      |     bear     | Betty

and

equality_ids=[2]
2: category 
---------------
   mouse

and

equality_ids=[2, 3]
2: category  | 3: name
----------------|-------------
   bear           | Winnie

With this simplification, the resulted expression would be simpler and the evaluation cost will be reduced.

When the delete file only has one field, the domain filter built from it can be used as a filter when reading other equality delete files whose fields include this one. In the above example, category <> 'mouse' can be pushed to file 1, whose row 1 and 2 would be filtered out. This not only helps final expression evaluation, but also improve the read performance for reading file 1.

If the delete file has more than 1 field, the situation is more complex. In the above example, file 3 would be compiled to category <> 'bear' OR name <> 'Winnie, but it cannot be pushed to file 1 nor the base file directly because it's a disjunctive expression. So far Velox only supports domain filters in conjunctive expressions. So for now we will only use single field equality delete files to do the simplifications. For this, we will go over the equality ids from all equality delete files and pick all single field ones to read first. Then the filters will be pushed to the other equality file readers.

In the future, we can even implement disjunctive expression push downs. For example category <> 'bear' OR name <> 'Winnie can be pushed to the SelectiveColumnReaders, with the category and name columns as a ColumnGroup. This will save the cost of having to read all values out before applying the filter function as a remaining filter, and the selectivity vector can be reused among them. Moreover, the reduction of rows from applying this filter directly on this ColumnGroup would benefit the reading of other columns later.

Expression Caching

We know that a unique HiveDataSource object is created for a unique TableScan operator, and the splits received by a HiveDataSource instance belong to the same query and same table. Additionally for Iceberg splits, they must be reading the same snapshot of an Iceberg table. When the HiveDataSource receives a new Iceberg split with some equality delete files, it would create a new IcebergSplitReader, which would open the delete files. If the equality delete file can be interpreted into some domain filters or filter functions, the scanSpec_ and remainingFilterExprSet_ in HIveDataSource may need to be updated.

Currently, the Iceberg library selects the qualified data and delete files based on partitions and snapshot Ids or transaction sequence numbers. For a single transaction, the snapshot is fixed, and all delete files from the same partition would go with the base data files when the splits are enumerated. So we can assume for now that all splits received from the same partition are the same for a single HiveDataSource. However, the delete files for different partitions could be different, and the splits from multiple partitions could arrive out of order. If we updated the scanSpec_ and remainingFilterExprSet_ for previous partition, we will need to restore them back to the original before applying the current set of delete files. As the first implementation, we will make a copy of these objects in the IcebergSplitReader and restore them back when the IcebergSplitReader is destructed.

In some user's workloads, the deletions are quite frequent, and the number of delete files coming with a split for a subsequent SELECT query can be many. For all splits in a partition, the delete files may be the same. We don't want to repeatedly read such equality delete files for every split a HiveDataSource needs to handle. One way of overcoming this is to build an expression cache. There are 2 levels of the caching ideas:

  1. A hash table in HiveDataSource
  2. A process wide cache for all Iceberg scans.

In 1, the key of the hash table is <partition, snapshotId> and the values are the compiled filters and expressions.
In 2, the key of the cache is <table, partition, snapshotId> and the values are the compiled filters and expressions. To avoid excessive contentions, we can divide the cache into multiple levels. The implementation will be adjusted with more experiments and observations of the customer workloads in the future.

If the Iceberg library changes its TableScan or FileScan in the future and can additionally prune the delete files based on each individual base data files, we will need to change the cache keys and add the information for the base file.

We will work on caching in the future when we understands the workloads better.

LogicalExpressionEvaluator Improvements

Currently the remaining filter is evaluated in HiveDataSource::evaluateRemainingFilter()

vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
  … 
  expressionEvaluator_->evaluate(
      remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
  auto res = exec::processFilterResults(
      filterResult_, filterRows_, filterEvalCtx_, pool_);
  return res;
}

This code evaluates the remainingFilterExprSet_ as a general expression instead of a special logical expression, and would put the result of the Expr's in remainingFilterExprSet_ in a FlatVector as bool type, then processFilterResults() would perform logical AND/OR on these vectors. This incurs additional memory copies. Moreover, it contains special handling of nulls, while the logical expressions would NOT produce NULLs at all, so this part of cost can be saved as well. Our newly introduced LogicalExpressionEvaluator will have its own evaluate() implementation that is more performant for logical expressions.

Testing

Prestissimo End To End Tests

In addition to unit tests we will have

  • TPCH, TPCDS built-in tests in Presto
  • IcebergExternalWorkerQueryRunner

Microbenchmarks

We will build microbenchmarks in Velox and Presto and cover both delete files.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions