Core, Spark: Use 'delete' if RowDelta only has delete files#10123
Core, Spark: Use 'delete' if RowDelta only has delete files#10123nastra merged 2 commits intoapache:mainfrom
Conversation
2209f87 to
c5dcc3a
Compare
|
|
||
| @Override | ||
| protected String operation() { | ||
| if (!addsDataFiles() && addsDeleteFiles()) { |
There was a problem hiding this comment.
If I understand right, wouldn't this condition need to be
if (!addsDataFiles() && (addsDeleteFiles() || deletesDataFiles())
There was a problem hiding this comment.
Basically I think for a metadata only delete we'd want to make sure the operation is delete
There was a problem hiding this comment.
yes you're absolutely right. Fixed it. This also needs some more tests, hence why it's WIP
There was a problem hiding this comment.
@amogh-jahagirdar actually I think if (!addsDataFiles() && addsDeleteFiles()) is correct as the RowDelta API only provides addRows(DataFile) and addDeletes(DeleteFile). deletesDataFiles() would apply for the RewriteFiles API.
There was a problem hiding this comment.
Ah true, but doesn't the same principle apply for RewriteFiles then? I think technically someone can use RewriteFiles to perform just a delete.
table.newRewrite().deleteFile(file)).commit()
It's awkward since they could just do a deleteFile but I think it's still possible, and we'd still want to produce the DELETE operation instead of a REPLACE. It seems like we should have a check in RewriteFiles as well.
There was a problem hiding this comment.
@aokolnychyi I agree that we should use replace when it's the same logical set of data but I think the issue is on what RewriteFiles actually enforces currently. It doesn't do any validation on not only having deletes, and I wouldn't expect a logical comparison since that requires a data comparison which isn't feasible.
I wrote the following test in TestRewriteFiles:
@TestTemplate
public void testDelete() {
commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch);
table.newRewrite().deleteFile(FILE_A).toBranch(branch).commit();
table.currentSnapshot();
}
and the operation commits successfully with a REPLACE operation. I know it's strange to use this API (and goes against what's in the JavaDoc) for just deleting a file but as of today it's technically possible and would lead to producing a snapshot operation with the incorrect REPLACE operation which can lead to issues such as the incremental processing case.
There's 2 ways to address this incorrectness that I can think of:
1.) Fail at the time of committing if there's only deletes or only additions. If this is the case then we know that the data cannot possibly be logically the same since there'd essentially be a net gain or net loss in data. I'm not sure if there's any other cases but this seems like a reasonable check which doesn't require actually comparing the data.
2.) Don't fail (to preserve the API behavior) and rather at the time of committing, if it's only a delete or only an append, use the correct API to perform the operation with the right summary.
I'm leaning towards 1, since the API docs already makes it clear that from an API perspective RewriteFiles should only be used when it's logically the same. If we know for a fact that it's not locally the same table with a simple check, we can enforce that.
There was a problem hiding this comment.
I see we have some validation in BaseRewriteFiles#validateReplacedAndAddedFiles, I think we're just missing this case. I'll raise a PR to show what I mean
There was a problem hiding this comment.
The use case you highlight is a valid one if FILE_A has all records removed with deletes. Then removing FILE_A has no impact on the table state.
That being said, we should definitely check if the validation is missing any edge case. Thanks for looking into that!
There was a problem hiding this comment.
Cool, thanks for explaining! This issue is ultimately orthogonal to the one that is being solved in this PR so we can take that separately.
3a32e54 to
5df00aa
Compare
5df00aa to
bbc2212
Compare
| deleteRowSchema); | ||
|
|
||
| table.newRowDelta().addDeletes(eqDeletes).commit(); | ||
| DataFile dataFile = |
There was a problem hiding this comment.
the test is specifically testing for OVERWRITE, hence I'm adding a data file here
6ef3220 to
b0c8b28
Compare
| } | ||
|
|
||
| @TestTemplate | ||
| public void deleteSingleRecordProducesDeleteOperation() { |
There was a problem hiding this comment.
something seems to be off with subclasses of TestDelete. The test produces different results on each run when executing ./gradlew :iceberg-spark:iceberg-spark-extensions-3.5_2.12:test --tests "org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete" --tests "org.apache.iceberg.spark.extensions.TestMergeOnReadDelete" vs when running TestCopyOnWriteDelete or TestMergeOnReadDelete within the IDE
There was a problem hiding this comment.
turns out this was because of the randomness of configuring adaptive query execution:
There was a problem hiding this comment.
I'll port this test to Spark 3.3/3.4 once CI passes
| assertThat(actual) | ||
| .as("Snapshot property " + property + " has unexpected value.") | ||
| .isEqualTo(expectedValue); | ||
| if (null == expectedValue) { |
There was a problem hiding this comment.
these changes will show the content of the snapshot summary in case the assertion fails
a191e0e to
29f6dd8
Compare
| String actual = snapshot.summary().get(property); | ||
| Assert.assertEquals( | ||
| "Snapshot property " + property + " has unexpected value.", expectedValue, actual); | ||
| if (null == expectedValue) { |
There was a problem hiding this comment.
these changes will show the content of the snapshot summary in case the assertion fails
|
|
||
| Snapshot currentSnapshot = table.currentSnapshot(); | ||
|
|
||
| if (Boolean.parseBoolean(spark.conf().get(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key()))) { |
There was a problem hiding this comment.
I recommend using append methods in SparkRowLevelOperationsTestBase to make sure the 3 records are added as a single file (I mean versions of append that accept json). Otherwise, you have to check for AQE and this logic is very fragile overall.
There was a problem hiding this comment.
good point, I've updated that to use append methods
| if (!addsDataFiles() && addsDeleteFiles()) { | ||
| return DataOperations.DELETE; | ||
| } | ||
|
|
There was a problem hiding this comment.
Optional: My personal preference would be to have the negation as the second argument (or even offer a new method in the parent with the negated meaning) and combine the two return statements into one. It is completely up to you, though.
if (addsDeleteFiles() && !addsDataFiles()) {
return DataOperations.DELETE;
} else {
return DataOperations.OVERWRITE;
}
aokolnychyi
left a comment
There was a problem hiding this comment.
Looks good to me. I have been meaning to do this for quite some time. Thanks, @nastra!
We should make sure RewriteFiles stays as replace.
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| protected String operation() { | ||
| if (!addsDataFiles() && addsDeleteFiles()) { |
There was a problem hiding this comment.
Cool, thanks for explaining! This issue is ultimately orthogonal to the one that is being solved in this PR so we can take that separately.
29f6dd8 to
1725387
Compare
1725387 to
7e0237d
Compare
|
thanks for the reviews @amogh-jahagirdar and @aokolnychyi |
Resolves #10122