Skip to content

Commit 4af3781

Browse files
loneknightpygatorsmile
authored andcommitted
[SPARK-10643][CORE] Make spark-submit download remote files to local in client mode
## What changes were proposed in this pull request? This PR makes spark-submit script download remote files to local file system for local/standalone client mode. ## How was this patch tested? - Unit tests - Manual tests by adding s3a jar and testing against file on s3. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Yu Peng <[email protected]> Closes #18078 from loneknightpy/download-jar-in-spark-submit.
1 parent c491e2e commit 4af3781

File tree

2 files changed

+140
-3
lines changed

2 files changed

+140
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy
2020
import java.io.{File, IOException}
2121
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
2222
import java.net.URL
23+
import java.nio.file.Files
2324
import java.security.PrivilegedExceptionAction
2425
import java.text.ParseException
2526

@@ -28,7 +29,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2829
import scala.util.Properties
2930

3031
import org.apache.commons.lang3.StringUtils
31-
import org.apache.hadoop.fs.Path
32+
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
33+
import org.apache.hadoop.fs.{FileSystem, Path}
3234
import org.apache.hadoop.security.UserGroupInformation
3335
import org.apache.ivy.Ivy
3436
import org.apache.ivy.core.LogOptions
@@ -308,6 +310,15 @@ object SparkSubmit extends CommandLineUtils {
308310
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
309311
}
310312

313+
// In client mode, download remote files.
314+
if (deployMode == CLIENT) {
315+
val hadoopConf = new HadoopConfiguration()
316+
args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
317+
args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
318+
args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
319+
args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
320+
}
321+
311322
// Require all python files to be local, so we can add them to the PYTHONPATH
312323
// In YARN cluster mode, python files are distributed as regular files, which can be non-local.
313324
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
@@ -825,6 +836,41 @@ object SparkSubmit extends CommandLineUtils {
825836
.mkString(",")
826837
if (merged == "") null else merged
827838
}
839+
840+
/**
841+
* Download a list of remote files to temp local files. If the file is local, the original file
842+
* will be returned.
843+
* @param fileList A comma separated file list.
844+
* @return A comma separated local files list.
845+
*/
846+
private[deploy] def downloadFileList(
847+
fileList: String,
848+
hadoopConf: HadoopConfiguration): String = {
849+
require(fileList != null, "fileList cannot be null.")
850+
fileList.split(",").map(downloadFile(_, hadoopConf)).mkString(",")
851+
}
852+
853+
/**
854+
* Download a file from the remote to a local temporary directory. If the input path points to
855+
* a local path, returns it with no operation.
856+
*/
857+
private[deploy] def downloadFile(path: String, hadoopConf: HadoopConfiguration): String = {
858+
require(path != null, "path cannot be null.")
859+
val uri = Utils.resolveURI(path)
860+
uri.getScheme match {
861+
case "file" | "local" =>
862+
path
863+
864+
case _ =>
865+
val fs = FileSystem.get(uri, hadoopConf)
866+
val tmpFile = new File(Files.createTempDirectory("tmp").toFile, uri.getPath)
867+
// scalastyle:off println
868+
printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.")
869+
// scalastyle:on println
870+
fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath))
871+
Utils.resolveURI(tmpFile.getAbsolutePath).toString
872+
}
873+
}
828874
}
829875

830876
/** Provides utility functions to be used inside SparkSubmit. */

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io._
21+
import java.net.URI
2122
import java.nio.charset.StandardCharsets
2223

2324
import scala.collection.mutable.ArrayBuffer
2425
import scala.io.Source
2526

2627
import com.google.common.io.ByteStreams
28+
import org.apache.commons.io.{FilenameUtils, FileUtils}
29+
import org.apache.hadoop.conf.Configuration
2730
import org.apache.hadoop.fs.Path
2831
import org.scalatest.{BeforeAndAfterEach, Matchers}
2932
import org.scalatest.concurrent.Timeouts
@@ -535,7 +538,7 @@ class SparkSubmitSuite
535538

536539
test("resolves command line argument paths correctly") {
537540
val jars = "/jar1,/jar2" // --jars
538-
val files = "hdfs:/file1,file2" // --files
541+
val files = "local:/file1,file2" // --files
539542
val archives = "file:/archive1,archive2" // --archives
540543
val pyFiles = "py-file1,py-file2" // --py-files
541544

@@ -587,7 +590,7 @@ class SparkSubmitSuite
587590

588591
test("resolves config paths correctly") {
589592
val jars = "/jar1,/jar2" // spark.jars
590-
val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
593+
val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files
591594
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
592595
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
593596

@@ -705,6 +708,87 @@ class SparkSubmitSuite
705708
}
706709
// scalastyle:on println
707710

711+
private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = {
712+
if (sourcePath == outputPath) {
713+
return
714+
}
715+
716+
val sourceUri = new URI(sourcePath)
717+
val outputUri = new URI(outputPath)
718+
assert(outputUri.getScheme === "file")
719+
720+
// The path and filename are preserved.
721+
assert(outputUri.getPath.endsWith(sourceUri.getPath))
722+
assert(FileUtils.readFileToString(new File(outputUri.getPath)) ===
723+
FileUtils.readFileToString(new File(sourceUri.getPath)))
724+
}
725+
726+
private def deleteTempOutputFile(outputPath: String): Unit = {
727+
val outputFile = new File(new URI(outputPath).getPath)
728+
if (outputFile.exists) {
729+
outputFile.delete()
730+
}
731+
}
732+
733+
test("downloadFile - invalid url") {
734+
intercept[IOException] {
735+
SparkSubmit.downloadFile("abc:/my/file", new Configuration())
736+
}
737+
}
738+
739+
test("downloadFile - file doesn't exist") {
740+
val hadoopConf = new Configuration()
741+
// Set s3a implementation to local file system for testing.
742+
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
743+
// Disable file system impl cache to make sure the test file system is picked up.
744+
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
745+
intercept[FileNotFoundException] {
746+
SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf)
747+
}
748+
}
749+
750+
test("downloadFile does not download local file") {
751+
// empty path is considered as local file.
752+
assert(SparkSubmit.downloadFile("", new Configuration()) === "")
753+
assert(SparkSubmit.downloadFile("/local/file", new Configuration()) === "/local/file")
754+
}
755+
756+
test("download one file to local") {
757+
val jarFile = File.createTempFile("test", ".jar")
758+
jarFile.deleteOnExit()
759+
val content = "hello, world"
760+
FileUtils.write(jarFile, content)
761+
val hadoopConf = new Configuration()
762+
// Set s3a implementation to local file system for testing.
763+
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
764+
// Disable file system impl cache to make sure the test file system is picked up.
765+
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
766+
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
767+
val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf)
768+
checkDownloadedFile(sourcePath, outputPath)
769+
deleteTempOutputFile(outputPath)
770+
}
771+
772+
test("download list of files to local") {
773+
val jarFile = File.createTempFile("test", ".jar")
774+
jarFile.deleteOnExit()
775+
val content = "hello, world"
776+
FileUtils.write(jarFile, content)
777+
val hadoopConf = new Configuration()
778+
// Set s3a implementation to local file system for testing.
779+
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
780+
// Disable file system impl cache to make sure the test file system is picked up.
781+
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
782+
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
783+
val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",")
784+
785+
assert(outputPaths.length === sourcePaths.length)
786+
sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) =>
787+
checkDownloadedFile(sourcePath, outputPath)
788+
deleteTempOutputFile(outputPath)
789+
}
790+
}
791+
708792
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
709793
private def runSparkSubmit(args: Seq[String]): Unit = {
710794
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -807,3 +891,10 @@ object UserClasspathFirstTest {
807891
}
808892
}
809893
}
894+
895+
class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
896+
override def copyToLocalFile(src: Path, dst: Path): Unit = {
897+
// Ignore the scheme for testing.
898+
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
899+
}
900+
}

0 commit comments

Comments
 (0)