Skip to content

Commit 3c58f7b

Browse files
author
Stavros Kontopoulos
committed
support client deps
1 parent bfb3ffe commit 3c58f7b

File tree

14 files changed

+545
-75
lines changed

14 files changed

+545
-75
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
2929
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
3030
import scala.util.{Properties, Try}
3131

32+
import org.apache.commons.io.FilenameUtils
3233
import org.apache.commons.lang3.StringUtils
3334
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
3435
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
222223
// Return values
223224
val childArgs = new ArrayBuffer[String]()
224225
val childClasspath = new ArrayBuffer[String]()
225-
val sparkConf = new SparkConf()
226+
val sparkConf = args.toSparkConf()
226227
var childMainClass = ""
227228

228229
// Set the cluster manager
@@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
313314
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
314315
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
315316
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
317+
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
318+
val isKubernetesClusterModeDriver = isKubernetesClient &&
319+
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
316320
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
317321

318322
if (!isMesosCluster && !isStandAloneCluster) {
@@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
323327
args.ivySettingsPath)
324328

325329
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
326-
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
327-
if (args.isPython || isInternal(args.primaryResource)) {
328-
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
330+
// In K8s client mode, when in the driver, add resolved jars early as we might need
331+
// them at the submit time for artifact downloading.
332+
// For example we might use the dependencies for downloading
333+
// files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
334+
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
335+
if (isKubernetesClusterModeDriver) {
336+
val loader = getSubmitClassLoader(sparkConf)
337+
for (jar <- resolvedMavenCoordinates.split(",")) {
338+
addJarToClasspath(jar, loader)
339+
}
340+
} else if (isKubernetesCluster) {
341+
// We need this in K8s cluster mode so that we can upload local deps
342+
// via the k8s application, like in cluster mode driver
343+
childClasspath ++= resolvedMavenCoordinates.split(",")
344+
} else {
345+
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
346+
if (args.isPython || isInternal(args.primaryResource)) {
347+
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
348+
}
329349
}
330350
}
331351

@@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
380400
localPyFiles = Option(args.pyFiles).map {
381401
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
382402
}.orNull
403+
404+
if (isKubernetesClusterModeDriver) {
405+
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
406+
// Executors will get the jars from the Spark file server.
407+
// Explicitly download the related files here
408+
args.jars = renameResourcesToLocalFS(args.jars, localJars)
409+
val localFiles = Option(args.files).map {
410+
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
411+
}.orNull
412+
args.files = renameResourcesToLocalFS(args.files, localFiles)
413+
}
383414
}
384415

385416
// When running in YARN, for some remote resources with scheme:
@@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
535566
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
536567

537568
// Propagate attributes for dependency resolution at the driver side
538-
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
539-
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
540-
confKey = "spark.jars.repositories"),
541-
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
542-
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
569+
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
570+
CLUSTER, confKey = "spark.jars.packages"),
571+
OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
572+
CLUSTER, confKey = "spark.jars.repositories"),
573+
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
574+
CLUSTER, confKey = "spark.jars.ivy"),
575+
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
543576
CLUSTER, confKey = "spark.jars.excludes"),
544577

545578
// Yarn only
@@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
777810
(childArgs, childClasspath, sparkConf, childMainClass)
778811
}
779812

813+
private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
814+
if (resources != null && localResources != null) {
815+
val localResourcesSeq = Utils.stringToSeq(localResources)
816+
Utils.stringToSeq(resources).map { resource =>
817+
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
818+
localResourcesSeq.find { localUri =>
819+
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
820+
filenameRemote == filenameLocal
821+
}.getOrElse(resource)
822+
}.mkString(",")
823+
} else {
824+
resources
825+
}
826+
}
827+
780828
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
781829
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
782830
// mode, we must trick it into thinking we're YARN.
@@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
787835
sparkConf.set(key, shortUserName)
788836
}
789837

838+
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
839+
val loader =
840+
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
841+
new ChildFirstURLClassLoader(new Array[URL](0),
842+
Thread.currentThread.getContextClassLoader)
843+
} else {
844+
new MutableURLClassLoader(new Array[URL](0),
845+
Thread.currentThread.getContextClassLoader)
846+
}
847+
Thread.currentThread.setContextClassLoader(loader)
848+
loader
849+
}
850+
790851
/**
791852
* Run the main method of the child class using the submit arguments.
792853
*
@@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
814875
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
815876
logInfo("\n")
816877
}
817-
818-
val loader =
819-
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
820-
new ChildFirstURLClassLoader(new Array[URL](0),
821-
Thread.currentThread.getContextClassLoader)
822-
} else {
823-
new MutableURLClassLoader(new Array[URL](0),
824-
Thread.currentThread.getContextClassLoader)
825-
}
826-
Thread.currentThread.setContextClassLoader(loader)
827-
878+
val loader = getSubmitClassLoader(sparkConf)
828879
for (jar <- childClasspath) {
829880
addJarToClasspath(jar, loader)
830881
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,12 +1325,12 @@ class SparkSubmitSuite
13251325
"--class", "Foo",
13261326
"app.jar")
13271327
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
1328-
Seq(
1329-
testConf,
1330-
masterConf
1331-
).foreach { case (k, v) =>
1332-
conf.get(k) should be (v)
1333-
}
1328+
Seq(
1329+
testConf,
1330+
masterConf
1331+
).foreach { case (k, v) =>
1332+
conf.get(k) should be (v)
1333+
}
13341334
}
13351335
}
13361336

docs/running-on-kubernetes.md

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
208208
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
209209
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
210210
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
211-
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
212-
client's local file system is currently not yet supported.
211+
dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
212+
client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
213+
A typical example of this using S3 is via passing the following options:
214+
215+
```
216+
...
217+
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
218+
--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
219+
--conf spark.hadoop.fs.s3a.access.key=...
220+
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
221+
--conf spark.hadoop.fs.s3a.fast.upload=true
222+
--conf spark.hadoop.fs.s3a.secret.key=....
223+
--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
224+
file:///full/path/to/app.jar
225+
```
226+
The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
227+
to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
228+
to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.
229+
230+
The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.
231+
232+
Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
233+
file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
234+
has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
235+
cluster mode.
213236

214237
## Secret Management
215238
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
@@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
455478
Some of these include:
456479

457480
* Dynamic Resource Allocation and External Shuffle Service
458-
* Local File Dependency Management
459481
* Job Queues and Resource Management
460482

461483
# Configuration
@@ -1069,6 +1091,15 @@ See the [configuration page](configuration.html) for information on Spark config
10691091
Specify the grace period in seconds when deleting a Spark application using spark-submit.
10701092
</td>
10711093
</tr>
1094+
<tr>
1095+
<td><code>spark.kubernetes.file.upload.path</code></td>
1096+
<td>(none)</td>
1097+
<td>
1098+
Path to store files at the spark submit side in cluster mode. For example:
1099+
<code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
1100+
File should specified as <code>file://path/to/file </code> or absolute path.
1101+
</td>
1102+
</tr>
10721103
</table>
10731104

10741105
#### Pod template properties

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,13 @@ private[spark] object Config extends Logging {
332332
.timeConf(TimeUnit.SECONDS)
333333
.createOptional
334334

335+
val KUBERNETES_FILE_UPLOAD_PATH =
336+
ConfigBuilder("spark.kubernetes.file.upload.path")
337+
.doc("Hadoop compatible file system path where files from the local file system " +
338+
"will be uploded to in cluster mode.")
339+
.stringConf
340+
.createOptional
341+
335342
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
336343
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
337344
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,25 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
import java.io.File
19+
import java.io.{File, IOException}
20+
import java.net.URI
2021
import java.security.SecureRandom
22+
import java.util.UUID
2123

2224
import scala.collection.JavaConverters._
2325

2426
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
2527
import io.fabric8.kubernetes.client.KubernetesClient
2628
import org.apache.commons.codec.binary.Hex
29+
import org.apache.hadoop.fs.{FileSystem, Path}
2730

2831
import org.apache.spark.{SparkConf, SparkException}
32+
import org.apache.spark.deploy.SparkHadoopUtil
33+
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
2934
import org.apache.spark.internal.Logging
35+
import org.apache.spark.launcher.SparkLauncher
3036
import org.apache.spark.util.{Clock, SystemClock, Utils}
37+
import org.apache.spark.util.Utils.getHadoopFileSystem
3138

3239
private[spark] object KubernetesUtils extends Logging {
3340

@@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
209216
Hex.encodeHexString(random) + time
210217
}
211218

219+
/**
220+
* Upload files and modify their uris
221+
*/
222+
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
223+
: Iterable[String] = {
224+
fileUris.map { uri =>
225+
uploadFileUri(uri, conf)
226+
}
227+
}
228+
229+
private def isLocalDependency(uri: URI): Boolean = {
230+
uri.getScheme match {
231+
case null | "file" => true
232+
case _ => false
233+
}
234+
}
235+
236+
def isLocalAndResolvable(resource: String): Boolean = {
237+
resource != SparkLauncher.NO_RESOURCE &&
238+
isLocalDependency(Utils.resolveURI(resource))
239+
}
240+
241+
def renameMainAppResource(resource: String, conf: SparkConf): String = {
242+
if (isLocalAndResolvable(resource)) {
243+
SparkLauncher.NO_RESOURCE
244+
} else {
245+
resource
246+
}
247+
}
248+
249+
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
250+
conf match {
251+
case Some(sConf) =>
252+
if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
253+
val fileUri = Utils.resolveURI(uri)
254+
try {
255+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
256+
val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
257+
val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
258+
val randomDirName = s"spark-upload-${UUID.randomUUID()}"
259+
fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
260+
val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
261+
log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
262+
uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
263+
targetUri
264+
} catch {
265+
case e: Exception =>
266+
throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
267+
}
268+
} else {
269+
throw new SparkException("Please specify " +
270+
"spark.kubernetes.file.upload.path property.")
271+
}
272+
case _ => throw new SparkException("Spark configuration is missing...")
273+
}
274+
}
275+
276+
/**
277+
* Upload a file to a Hadoop-compatible filesystem.
278+
*/
279+
private def uploadFileToHadoopCompatibleFS(
280+
src: Path,
281+
dest: Path,
282+
fs: FileSystem,
283+
delSrc : Boolean = false,
284+
overwrite: Boolean = true): Unit = {
285+
try {
286+
fs.copyFromLocalFile(false, true, src, dest)
287+
} catch {
288+
case e: IOException =>
289+
throw new SparkException(s"Error uploading file ${src.getName}", e)
290+
}
291+
}
212292
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
2828
import org.apache.spark.deploy.k8s.submit._
2929
import org.apache.spark.internal.config._
30-
import org.apache.spark.internal.config.UI._
3130
import org.apache.spark.ui.SparkUI
3231
import org.apache.spark.util.Utils
3332

@@ -153,6 +152,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
153152
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
154153
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
155154
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
155+
// try upload local, resolvable files to a hadoop compatible file system
156+
Seq(JARS, FILES).foreach { key =>
157+
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
158+
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
159+
if (resolved.nonEmpty) {
160+
additionalProps.put(key.key, resolved.mkString(","))
161+
}
162+
}
156163
additionalProps.toMap
157164
}
158165
}
166+

0 commit comments

Comments
 (0)