Skip to content

Conversation

@rahil-c
Copy link
Contributor

@rahil-c rahil-c commented Jun 26, 2023

Issue Summary

  • When testing the following Iceberg integration, I encountered the following failure when running REMOVE ORPHAN FILES PROCEDURE
java.io.UncheckedIOException: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;
    at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:367) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:292) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:277) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:243) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:127) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:229) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]
    at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:171) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-1-SNAPSHOT.jar:?]

  • Currently in DeleteOrphanFilesSparkAction#listDirRecursively, we do not use the concept of FileIO and instead we directly create the fileSystem and make listStatus calls.
  FileSystem fs = path.getFileSystem(conf);

      List<String> subDirs = Lists.newArrayList();

      for (FileStatus file : fs.listStatus(path, pathFilter)) {

We need to use the s3FileIo otherwise we will hit a 403 s3 access denied error, as the file system does not have correct credentials to access this data. S3FileIO however has the correct credentials when the s3 client is built.

Aim OF PR

  • Use s3 File IO within this REMOVE ORPHAN FILES procedure code path.

Testing

  • With this change, I do not hit 403 and can run this procedure without issue.

spark-sql> CALL my_catalog.system.remove_orphan_files(table => 'iceberg_db.iceberg_table8', dry_run => true);
23/06/21 22:28:56 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://rchertar-dev/iceberg-nrt/iceberg_table8/metadata/00003-4a0f642b-88a9-41e1-b1ab-5c0bada18b12.metadata.json
23/06/21 22:28:56 INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
23/06/21 22:28:57 INFO BaseMetastoreCatalog: Table loaded by catalog: my_catalog.iceberg_db.iceberg_table8
23/06/21 22:28:57 INFO BaseAllMetadataTableScan: Scanning metadata table my_catalog.iceberg_db.iceberg_table8 with filter true.
23/06/21 22:28:58 INFO BaseAllMetadataTableScan: Scanning metadata table my_catalog.iceberg_db.iceberg_table8 with filter true.
Time taken: 21.439 seconds
spark-sql> select * from my_catalog.iceberg_db.iceberg_table8;
23/06/21 22:29:24 INFO BaseTableScan: Scanning table my_catalog.iceberg_db.iceberg_table8 snapshot 7557581385081927782 created at 2023-06-21T22:26:54.853+00:00 with filter true
23/06/21 22:29:24 INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=my_catalog.iceberg_db.iceberg_table8, snapshotId=7557581385081927782, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, creation_date, last_update_time], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.13239524S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=3}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=3}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=3}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=2763}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.3.1-amzn-0, app-id=application_1682982958961_0098, engine-name=spark}}
8	jan8	8
9	jan9	9
10	jan10	10
Time taken: 4.427 seconds, Fetched 3 row(s)
spark-sql>

@github-actions github-actions bot added the spark label Jun 26, 2023
@rahil-c
Copy link
Contributor Author

rahil-c commented Jun 26, 2023

Would appreciate if I can get a review from community.

cc @jackye1995 @aokolnychyi @amogh-jahagirdar @RussellSpitzer

@rahil-c rahil-c changed the title Use S3FileIO for Remove OrphanFile Procedure when using LakeFormation Use SupportsPrefixOperations for Remove OrphanFile Procedure Jun 26, 2023
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}
} else {
Copy link
Contributor

@jackye1995 jackye1995 Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to use else here, because the if case directly returns. This should avoid many unnecessary diffs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@rahil-c rahil-c force-pushed the rahilc/master branch 2 times, most recently from 7b09b5c to d268b1b Compare June 26, 2023 20:53
List<String> matchingFiles = Lists.newArrayList();

if (table.io() instanceof SupportsPrefixOperations) {
Iterator<org.apache.iceberg.io.FileInfo> iterator = ((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good start but I would ask that we only make the change in v3.4 to start (we can backport level)

In addition can we set this up without the early exit?

Ie

If (table.io supportsPrefix)
 files = listPrefix()
else
  files = listNoPrefix()
  }
  
  spark.createDataset(files)

We also need a test which exercises this code path, (Does HadoopFS do this by default? If So do we have a test for the other path:noprefix)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer for taking a look. I can make the change to 3.4 only.
However I think early exit would be ideal, as after this if condition we would be calling the

    listDirRecursively(
        location,
        predicate,
        hadoopConf.value(),
        MAX_DRIVER_LISTING_DEPTH,
        MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
        subDirs,
        pathFilter,
        matchingFiles);

.......
 ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
 JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);

Which would both be making direct FileSystem listStatus calls which we ideally want to avoid and just use the S3fileIO. Let me know if there are concerns with the early exit/current logic

cc @jackye1995

Copy link
Member

@RussellSpitzer RussellSpitzer Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is not to change the logic, but to change the structure of the code. Basically you would take this code and the code after the exit and create two functions (names are just examples)

Dataset<String> listWithPrefix() {
    List<String> matchingFiles = Lists.newArrayList();
    Iterator<org.apache.iceberg.io.FileInfo> iterator = ((SupportsPrefixOperations) 
    table.io()).listPrefix(location).iterator();
    while (iterator.hasNext()) {
      org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
      if (fileInfo.createdAtMillis() < olderThanTimestamp) {
          matchingFiles.add(fileInfo.location());
      }
    }
    JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
    return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
  }

Dataset<String> listWithoutPrefix() {
    List<String> matchingFiles = Lists.newArrayList();
    Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
    PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());

    // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
    // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
    listDirRecursively(
        location,
        predicate,
        hadoopConf.value(),
        MAX_DRIVER_LISTING_DEPTH,
        MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
        subDirs,
        pathFilter,
        matchingFiles);
        
    JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
    if (subDirs.isEmpty()) {
      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
    }
    int parallelism = Math.min(subDirs.size(), listingParallelism);
    JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
    ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
    JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
    JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}

Then you change listedFilesDS

listedFileDS() { 
If (table.io supportsPrefix)
 files = listPrefix()
else
  files = listNoPrefix()
  }
  return files
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for the detailed explanation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer
Copy link
Member

PR still needs tests, let me know when they have been added?

@nastra
Copy link
Contributor

nastra commented Aug 23, 2023

@rahil-c just wanted to check-in and see if you were planning on updating the PR? It would be great to get this in for the 1.4.0 release

@lyohar
Copy link

lyohar commented Nov 22, 2023

Got similar issue in 1.4.2, spark 3.5

My iceberg catalogue in Spark is configured via org.apache.iceberg.aws.s3.S3FileIO filesystem. I store files using s3 prefix;
Hovewer, when trying to clean files i get an error org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3".

@domonkosbalogh-seon
Copy link

Ran into a similar issue (same as in #8368) using the Glue Catalog. Is there maybe a workaround to this, or this PR would be the only fix?

@carlosescura
Copy link

Same issue here. I can't run the remove_orphan_files procedure using Glue and S3 😢

@lokeshrdy
Copy link

same issue here . let me know if anyone solved this with latest version? @carlosescura @domonkosbalogh-seon @rahil-c

@carlosescura
Copy link

@lokeshrdy Still doesn't work using Spark 3.5.0and Iceberg 1.5.0 and Glue as catalog with the following config:

SPARK_SETTINGS = [
    (
        "spark.jars",
        """
        /opt/spark/jars/iceberg-aws-bundle-1.5.0.jar,
        /opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar,
        /opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar,
        /opt/spark/jars/hadoop-aws-3.3.4.jar
        """,
    ),
    ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("spark.hadoop.com.amazonaws.services.s3.enableV4", "true"),
    (
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    ),
    (
        "spark.sql.catalog.main_catalog",
        "org.apache.iceberg.spark.SparkCatalog",
    ),
    (
        "spark.sql.catalog.main_catalog.catalog-impl",
        "org.apache.iceberg.aws.glue.GlueCatalog",
    ),
    (
        "spark.sql.catalog.main_catalog.io-impl",
        "org.apache.iceberg.aws.s3.S3FileIO",
    ),
    (
        "spark.sql.catalog.main_catalog.warehouse",
        ICEBERG_CATALOG_WHAREHOUSE,
    ),
]

I had to add hadoop-aws-3.3.4.jar to be able to download some CSVs and load them as Spark DF.

When calling the remove_orphan_files procedure I get the following exception:

py4j.protocol.Py4JJavaError: An error occurred while calling o46.sql.
: java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:386)
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:311)
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:296)
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:247)
	at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
	at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
	at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:130)
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:223)
	at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:185)
	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
	at org.apache.iceberg.spark.procedures.BaseProcedure.withIcebergTable(BaseProcedure.java:96)
	at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.call(RemoveOrphanFilesProcedure.java:139)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:356)
	... 55 more

@nastra
Copy link
Contributor

nastra commented Mar 19, 2024

@carlosescura the issue itself hasn't be solved yet. I'm not sure if @rahil-c is actively working on this issue. If not, maybe someone else from the community is interested in working on this.

@carlosescura
Copy link

@rahil-c is there any possibility to continue working on this PR? Many of us would really appreciate it.

@schobe
Copy link

schobe commented Jul 3, 2024

Hi , I am also facing the same issue while running orphan file clean up via Nessie REST. Auto-compaction and snapshot expiry works, but orphan file clean up procecure gives the same error. Is there any ETA on this fix?

java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

@rahil-c
Copy link
Contributor Author

rahil-c commented Jul 4, 2024

Hi all sorry for the delay on this issue, been engaged in many internal things at work so did not get time to revisit this.

Originally when I encountered this issue it was a very specific feature I was working on with AWS LakeFormation and Iceberg integration hence I opened this PR, to solve that issue. It seems there are several people however that have been hitting issues around Remove OrphanFile Procedure but unsure as to if its exactly the same issue that I mentioned in the overview.

In terms of the following issue No FileSystem for scheme "s3"., my understanding is the remove orphan file procedure is invoking the hadoop file system directly, and if a user is trying to read a s3 path, hadoop does not understand naturally what this file scheme is. https://github.com/apache/iceberg/blob/main/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java#L356

The mitigation would be to likely leverage hadoop-aws jar and configure spark with the appropriate hadoop aws configurations. In the iceberg aws docs: https://github.com/apache/iceberg/blob/main/docs/docs/aws.md#hadoop-s3a-filesystem

Add [hadoop-aws](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws) as a runtime dependency of your compute engine.
Configure AWS settings based on [hadoop-aws documentation](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html) (make sure you check the version, S3A configuration varies a lot based on the version you use).

I think in users spark configurations they can try adding
"spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem", as I saw a similar thread here: https://apache-iceberg.slack.com/archives/C03LG1D563F/p1656918500567629

As for landing this PR will see if I can add tests based on @RussellSpitzer feedback.

@rahil-c
Copy link
Contributor Author

rahil-c commented Jul 8, 2024

@RussellSpitzer @amogh-jahagirdar Wanted to understand what is the actual test needed for this change? I saw this comment

We also need a test which exercises this code path, (Does HadoopFS do this by default? If So do we have a test for the other path:noprefix)

However based on the diff of the pr the actual logic change is on the list with prefix path.

When checking TestRemoveOrphanFilesAction which uses DeleteOrphanFilesSparkAction my assumption is it would test list prefix as this test is using HadoopTables which use the HadoopFileIO which leverages the SupportPrefixOperations interface.

 @Override
  public Iterable<FileInfo> listPrefix(String prefix) {
    Path prefixToList = new Path(prefix);
    FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());

    return () -> {
      try {
        return Streams.stream(
                new AdaptingIterator<>(fs.listFiles(prefixToList, true /* recursive */)))
            .map(
                fileStatus ->
                    new FileInfo(
                        fileStatus.getPath().toString(),
                        fileStatus.getLen(),
                        fileStatus.getModificationTime()))
            .iterator();
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }
    };
  }

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Jul 8, 2024

The basic issue is we want to make sure we have a test which uses both the supportsPrefix enabled FS and one where it is not enabled so we are sure that both implementations remain correct.

@rahil-c
Copy link
Contributor Author

rahil-c commented Jul 9, 2024

The basic issue is we want to make sure we have a test which uses both the supportsPrefix enabled FS and one where it is not enabled to we are sure that both implementations remain correct.

I see thank you for the clarification @RussellSpitzer @amogh-jahagirdar


private Dataset<String> listedFileDS() {
private Dataset<String> listWithPrefix() {
List<String> matchingFiles = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to have fantastic speedups with S3 and any Hadoop FS which does deep listing (s3, gcs).

return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}

private Dataset<String> listedFileDS() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this actually used? I can't see it being invoked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is called by method actualFileIdentDS().

@steveloughran
Copy link
Contributor

HadoopFileIO (and therefore the local fs) supports listPrefix. It'll need a CustomFileIO as with similar tests

}

private Dataset<String> listedFileDS() {
if (table.io() instanceof SupportsPrefixOperations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for fallback

@github-actions
Copy link

github-actions bot commented Oct 4, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 12, 2024
@yunlou11
Copy link

CALL nessie.system.remove_orphan_files(table => 'nessie.robot_dev.robot_data')
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)

@RussellSpitzer

@MonkeyCanCode
Copy link

MonkeyCanCode commented Oct 22, 2024

Confirm the issue is still there. After manually set the spark.hadoop.fs.s3.impl to S3A. If the client has S3 credential with needed access, it will work. However, if through credential vending from Polars, it can fail (in this case, client doesn't have S3 credential). Here is the reference ticket in Polaris side: apache/polaris#390

@yunlou11
Copy link

@rahil-c please check if its true below:
listWithPrefix maybe list too much unexpected files, such as 2 tables: sample , sample_part
matchingFiles will contain:

...
DeleteOrphanFilesSparkAction: match files: s3a://ice-lake/warehouse/ice_db/sample/metadata/snap-4835616794401450947-1-f5bb6d24-162f-4c9d-a426-893c07cac506.avro
DeleteOrphanFilesSparkAction: match files: s3a://ice-lake/warehouse/ice_db/sample_part/data/ds=20240806/00000-3-0c28a14d-186c-489d-aeb8-f0a949a67297-0-00002.parquet
...

so sample_part table will lost its metadata files

@Samreay
Copy link

Samreay commented Dec 12, 2024

Has anyone got a nice workaround for how to remove orphan files for an S3-located iceberg table? Conscious this is PR is 18months old and I'm assuming someone has got this to work on their end... somehow?

@steveloughran
Copy link
Contributor

S3a fs implemente bulk delete too...maybe this and S3FileIO can do the right thing (*)

(*) we added it to all filesystems, but the page size of the others is zero

@danielcweeks
Copy link
Contributor

@steveloughran This isn't about bulk deletes (which S3FileIO does support). The issue is how to properly scale the identification of orphaned files, which is function of the procedure, not the file system.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.