Skip to content

Commit 618c1f6

Browse files
committed
Merge pull request alteryx#125 from velvia/2013-10/local-jar-uri
Add support for local:// URI scheme for addJars() This PR adds support for a new URI scheme for SparkContext.addJars(): `local://file/path`. The *local* scheme indicates that the `/file/path` exists on every worker node. The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters. Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration. I would add something to the docs, but it's not obvious where to add it. Oh, and it would be great if this could be merged in time for 0.8.1.
2 parents 745dc42 + de02855 commit 618c1f6

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ class SparkContext(
683683
/**
684684
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
685685
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
686-
* filesystems), or an HTTP, HTTPS or FTP URI.
686+
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
687687
*/
688688
def addJar(path: String) {
689689
if (path == null) {
@@ -696,6 +696,7 @@ class SparkContext(
696696
} else {
697697
val uri = new URI(path)
698698
key = uri.getScheme match {
699+
// A JAR file which exists only on the driver node
699700
case null | "file" =>
700701
if (env.hadoop.isYarnMode()) {
701702
// In order for this to work on yarn the user must specify the --addjars option to
@@ -713,6 +714,9 @@ class SparkContext(
713714
} else {
714715
env.httpFileServer.addJar(new File(uri.getPath))
715716
}
717+
// A JAR file which exists locally on every worker node
718+
case "local" =>
719+
"file:" + uri.getPath
716720
case _ =>
717721
path
718722
}

core/src/test/scala/org/apache/spark/FileServerSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
120120
}.collect()
121121
assert(result.toSet === Set((1,2), (2,7), (3,121)))
122122
}
123+
124+
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
125+
sc = new SparkContext("local-cluster[1,1,512]", "test")
126+
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
127+
sc.addJar(sampleJarFile.replace("file", "local"))
128+
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
129+
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
130+
val fac = Thread.currentThread.getContextClassLoader()
131+
.loadClass("org.uncommons.maths.Maths")
132+
.getDeclaredMethod("factorial", classOf[Int])
133+
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
134+
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
135+
a + b
136+
}.collect()
137+
assert(result.toSet === Set((1,2), (2,7), (3,121)))
138+
}
123139
}

0 commit comments

Comments
 (0)