-
Notifications
You must be signed in to change notification settings - Fork 3k
Use SupportsPrefixOperations for Remove OrphanFile Procedure #7914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Would appreciate if I can get a review from community. cc @jackye1995 @aokolnychyi @amogh-jahagirdar @RussellSpitzer |
| JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1); | ||
| return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to use else here, because the if case directly returns. This should avoid many unnecessary diffs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
7b09b5c to
d268b1b
Compare
| List<String> matchingFiles = Lists.newArrayList(); | ||
|
|
||
| if (table.io() instanceof SupportsPrefixOperations) { | ||
| Iterator<org.apache.iceberg.io.FileInfo> iterator = ((SupportsPrefixOperations) table.io()).listPrefix(location).iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good start but I would ask that we only make the change in v3.4 to start (we can backport level)
In addition can we set this up without the early exit?
Ie
If (table.io supportsPrefix)
files = listPrefix()
else
files = listNoPrefix()
}
spark.createDataset(files)We also need a test which exercises this code path, (Does HadoopFS do this by default? If So do we have a test for the other path:noprefix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer for taking a look. I can make the change to 3.4 only.
However I think early exit would be ideal, as after this if condition we would be calling the
listDirRecursively(
location,
predicate,
hadoopConf.value(),
MAX_DRIVER_LISTING_DEPTH,
MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
subDirs,
pathFilter,
matchingFiles);
.......
ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
Which would both be making direct FileSystem listStatus calls which we ideally want to avoid and just use the S3fileIO. Let me know if there are concerns with the early exit/current logic
cc @jackye1995
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is not to change the logic, but to change the structure of the code. Basically you would take this code and the code after the exit and create two functions (names are just examples)
Dataset<String> listWithPrefix() {
List<String> matchingFiles = Lists.newArrayList();
Iterator<org.apache.iceberg.io.FileInfo> iterator = ((SupportsPrefixOperations)
table.io()).listPrefix(location).iterator();
while (iterator.hasNext()) {
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
if (fileInfo.createdAtMillis() < olderThanTimestamp) {
matchingFiles.add(fileInfo.location());
}
}
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}
Dataset<String> listWithoutPrefix() {
List<String> matchingFiles = Lists.newArrayList();
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
listDirRecursively(
location,
predicate,
hadoopConf.value(),
MAX_DRIVER_LISTING_DEPTH,
MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
subDirs,
pathFilter,
matchingFiles);
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
if (subDirs.isEmpty()) {
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}
int parallelism = Math.min(subDirs.size(), listingParallelism);
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}Then you change listedFilesDS
listedFileDS() {
If (table.io supportsPrefix)
files = listPrefix()
else
files = listNoPrefix()
}
return files
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thank you for the detailed explanation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
PR still needs tests, let me know when they have been added? |
|
@rahil-c just wanted to check-in and see if you were planning on updating the PR? It would be great to get this in for the 1.4.0 release |
|
Got similar issue in 1.4.2, spark 3.5 My iceberg catalogue in Spark is configured via org.apache.iceberg.aws.s3.S3FileIO filesystem. I store files using s3 prefix; |
|
Ran into a similar issue (same as in #8368) using the Glue Catalog. Is there maybe a workaround to this, or this PR would be the only fix? |
|
Same issue here. I can't run the |
|
same issue here . let me know if anyone solved this with latest version? @carlosescura @domonkosbalogh-seon @rahil-c |
|
@lokeshrdy Still doesn't work using Spark I had to add When calling the |
|
@carlosescura the issue itself hasn't be solved yet. I'm not sure if @rahil-c is actively working on this issue. If not, maybe someone else from the community is interested in working on this. |
|
@rahil-c is there any possibility to continue working on this PR? Many of us would really appreciate it. |
|
Hi , I am also facing the same issue while running orphan file clean up via Nessie REST. Auto-compaction and snapshot expiry works, but orphan file clean up procecure gives the same error. Is there any ETA on this fix? java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3" |
|
Hi all sorry for the delay on this issue, been engaged in many internal things at work so did not get time to revisit this. Originally when I encountered this issue it was a very specific feature I was working on with AWS LakeFormation and Iceberg integration hence I opened this PR, to solve that issue. It seems there are several people however that have been hitting issues around In terms of the following issue The mitigation would be to likely leverage I think in users spark configurations they can try adding As for landing this PR will see if I can add tests based on @RussellSpitzer feedback. |
|
@RussellSpitzer @amogh-jahagirdar Wanted to understand what is the actual test needed for this change? I saw this comment However based on the diff of the pr the actual logic change is on the list with prefix path. When checking |
|
The basic issue is we want to make sure we have a test which uses both the supportsPrefix enabled FS and one where it is not enabled so we are sure that both implementations remain correct. |
I see thank you for the clarification @RussellSpitzer @amogh-jahagirdar |
|
|
||
| private Dataset<String> listedFileDS() { | ||
| private Dataset<String> listWithPrefix() { | ||
| List<String> matchingFiles = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to have fantastic speedups with S3 and any Hadoop FS which does deep listing (s3, gcs).
| return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()); | ||
| } | ||
|
|
||
| private Dataset<String> listedFileDS() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this actually used? I can't see it being invoked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is called by method actualFileIdentDS().
|
HadoopFileIO (and therefore the local fs) supports listPrefix. It'll need a CustomFileIO as with similar tests |
| } | ||
|
|
||
| private Dataset<String> listedFileDS() { | ||
| if (table.io() instanceof SupportsPrefixOperations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for fallback
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
CALL nessie.system.remove_orphan_files(table => 'nessie.robot_dev.robot_data')
|
|
Confirm the issue is still there. After manually set the spark.hadoop.fs.s3.impl to S3A. If the client has S3 credential with needed access, it will work. However, if through credential vending from Polars, it can fail (in this case, client doesn't have S3 credential). Here is the reference ticket in Polaris side: apache/polaris#390 |
|
@rahil-c please check if its true below: so sample_part table will lost its metadata files |
|
Has anyone got a nice workaround for how to remove orphan files for an S3-located iceberg table? Conscious this is PR is 18months old and I'm assuming someone has got this to work on their end... somehow? |
|
S3a fs implemente bulk delete too...maybe this and S3FileIO can do the right thing (*) (*) we added it to all filesystems, but the page size of the others is zero |
|
@steveloughran This isn't about bulk deletes (which S3FileIO does support). The issue is how to properly scale the identification of orphaned files, which is function of the procedure, not the file system. |
Issue Summary
REMOVE ORPHAN FILES PROCEDUREDeleteOrphanFilesSparkAction#listDirRecursively, we do not use the concept ofFileIOand instead we directly create thefileSystemand makelistStatuscalls.We need to use the
s3FileIootherwise we will hit a 403 s3 access denied error, as the file system does not have correct credentials to access this data. S3FileIO however has the correct credentials when the s3 client is built.Aim OF PR
REMOVE ORPHAN FILESprocedure code path.Testing