|
18 | 18 | package org.apache.spark.deploy |
19 | 19 |
|
20 | 20 | import java.io._ |
| 21 | +import java.net.URI |
21 | 22 | import java.nio.charset.StandardCharsets |
22 | 23 |
|
23 | 24 | import scala.collection.mutable.ArrayBuffer |
24 | 25 | import scala.io.Source |
25 | 26 |
|
26 | 27 | import com.google.common.io.ByteStreams |
| 28 | +import org.apache.commons.io.{FilenameUtils, FileUtils} |
| 29 | +import org.apache.hadoop.conf.Configuration |
27 | 30 | import org.apache.hadoop.fs.Path |
28 | 31 | import org.scalatest.{BeforeAndAfterEach, Matchers} |
29 | 32 | import org.scalatest.concurrent.Timeouts |
@@ -535,7 +538,7 @@ class SparkSubmitSuite |
535 | 538 |
|
536 | 539 | test("resolves command line argument paths correctly") { |
537 | 540 | val jars = "/jar1,/jar2" // --jars |
538 | | - val files = "hdfs:/file1,file2" // --files |
| 541 | + val files = "local:/file1,file2" // --files |
539 | 542 | val archives = "file:/archive1,archive2" // --archives |
540 | 543 | val pyFiles = "py-file1,py-file2" // --py-files |
541 | 544 |
|
@@ -587,7 +590,7 @@ class SparkSubmitSuite |
587 | 590 |
|
588 | 591 | test("resolves config paths correctly") { |
589 | 592 | val jars = "/jar1,/jar2" // spark.jars |
590 | | - val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files |
| 593 | + val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files |
591 | 594 | val archives = "file:/archive1,archive2" // spark.yarn.dist.archives |
592 | 595 | val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles |
593 | 596 |
|
@@ -705,6 +708,87 @@ class SparkSubmitSuite |
705 | 708 | } |
706 | 709 | // scalastyle:on println |
707 | 710 |
|
| 711 | + private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = { |
| 712 | + if (sourcePath == outputPath) { |
| 713 | + return |
| 714 | + } |
| 715 | + |
| 716 | + val sourceUri = new URI(sourcePath) |
| 717 | + val outputUri = new URI(outputPath) |
| 718 | + assert(outputUri.getScheme === "file") |
| 719 | + |
| 720 | + // The path and filename are preserved. |
| 721 | + assert(outputUri.getPath.endsWith(sourceUri.getPath)) |
| 722 | + assert(FileUtils.readFileToString(new File(outputUri.getPath)) === |
| 723 | + FileUtils.readFileToString(new File(sourceUri.getPath))) |
| 724 | + } |
| 725 | + |
| 726 | + private def deleteTempOutputFile(outputPath: String): Unit = { |
| 727 | + val outputFile = new File(new URI(outputPath).getPath) |
| 728 | + if (outputFile.exists) { |
| 729 | + outputFile.delete() |
| 730 | + } |
| 731 | + } |
| 732 | + |
| 733 | + test("downloadFile - invalid url") { |
| 734 | + intercept[IOException] { |
| 735 | + SparkSubmit.downloadFile("abc:/my/file", new Configuration()) |
| 736 | + } |
| 737 | + } |
| 738 | + |
| 739 | + test("downloadFile - file doesn't exist") { |
| 740 | + val hadoopConf = new Configuration() |
| 741 | + // Set s3a implementation to local file system for testing. |
| 742 | + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") |
| 743 | + // Disable file system impl cache to make sure the test file system is picked up. |
| 744 | + hadoopConf.set("fs.s3a.impl.disable.cache", "true") |
| 745 | + intercept[FileNotFoundException] { |
| 746 | + SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf) |
| 747 | + } |
| 748 | + } |
| 749 | + |
| 750 | + test("downloadFile does not download local file") { |
| 751 | + // empty path is considered as local file. |
| 752 | + assert(SparkSubmit.downloadFile("", new Configuration()) === "") |
| 753 | + assert(SparkSubmit.downloadFile("/local/file", new Configuration()) === "/local/file") |
| 754 | + } |
| 755 | + |
| 756 | + test("download one file to local") { |
| 757 | + val jarFile = File.createTempFile("test", ".jar") |
| 758 | + jarFile.deleteOnExit() |
| 759 | + val content = "hello, world" |
| 760 | + FileUtils.write(jarFile, content) |
| 761 | + val hadoopConf = new Configuration() |
| 762 | + // Set s3a implementation to local file system for testing. |
| 763 | + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") |
| 764 | + // Disable file system impl cache to make sure the test file system is picked up. |
| 765 | + hadoopConf.set("fs.s3a.impl.disable.cache", "true") |
| 766 | + val sourcePath = s"s3a://${jarFile.getAbsolutePath}" |
| 767 | + val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf) |
| 768 | + checkDownloadedFile(sourcePath, outputPath) |
| 769 | + deleteTempOutputFile(outputPath) |
| 770 | + } |
| 771 | + |
| 772 | + test("download list of files to local") { |
| 773 | + val jarFile = File.createTempFile("test", ".jar") |
| 774 | + jarFile.deleteOnExit() |
| 775 | + val content = "hello, world" |
| 776 | + FileUtils.write(jarFile, content) |
| 777 | + val hadoopConf = new Configuration() |
| 778 | + // Set s3a implementation to local file system for testing. |
| 779 | + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") |
| 780 | + // Disable file system impl cache to make sure the test file system is picked up. |
| 781 | + hadoopConf.set("fs.s3a.impl.disable.cache", "true") |
| 782 | + val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}") |
| 783 | + val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",") |
| 784 | + |
| 785 | + assert(outputPaths.length === sourcePaths.length) |
| 786 | + sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) => |
| 787 | + checkDownloadedFile(sourcePath, outputPath) |
| 788 | + deleteTempOutputFile(outputPath) |
| 789 | + } |
| 790 | + } |
| 791 | + |
708 | 792 | // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. |
709 | 793 | private def runSparkSubmit(args: Seq[String]): Unit = { |
710 | 794 | val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) |
@@ -807,3 +891,10 @@ object UserClasspathFirstTest { |
807 | 891 | } |
808 | 892 | } |
809 | 893 | } |
| 894 | + |
| 895 | +class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { |
| 896 | + override def copyToLocalFile(src: Path, dst: Path): Unit = { |
| 897 | + // Ignore the scheme for testing. |
| 898 | + super.copyToLocalFile(new Path(src.toUri.getPath), dst) |
| 899 | + } |
| 900 | +} |
0 commit comments