Skip to content

Commit fd3ddb2

Browse files
committed
Address Erik's comments
1 parent fab9f9e commit fd3ddb2

File tree

3 files changed

+62
-16
lines changed

3 files changed

+62
-16
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,7 @@ private[spark] object SparkSubmitUtils {
12791279
ivyPath: Option[String]): IvySettings = {
12801280
val uri = new URI(settingsFile)
12811281
val file = Option(uri.getScheme).getOrElse("file") match {
1282-
case "file" => new File(settingsFile)
1282+
case "file" => new File(uri.getPath)
12831283
case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
12841284
"spark.jars.ivySettings")
12851285
}

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,19 +526,19 @@ private[spark] class Client(
526526
val uri = new URI(ivySettingsPath)
527527
Option(uri.getScheme).getOrElse("file") match {
528528
case "file" =>
529-
val ivySettingsFile = new File(ivySettingsPath)
529+
val ivySettingsFile = new File(uri.getPath)
530530
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
531531
require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" +
532532
"normal file")
533533
// Generate a file name that can be used for the ivySettings file, that does not
534534
// conflict with any user file.
535535
val localizedFileName = Some(ivySettingsFile.getName() + "-" +
536536
UUID.randomUUID().toString)
537-
val (_, localizedPath) = distribute(ivySettings.get, destName = localizedFileName)
537+
val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName)
538538
require(localizedPath != null, "IvySettings file already distributed.")
539539
Some(localizedPath)
540540
case scheme =>
541-
throw new IllegalArgumentException(s"Scheme $scheme not supported in" +
541+
throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
542542
"spark.jars.ivySettings")
543543
}
544544
case _ => None

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files}
2929
import org.apache.hadoop.yarn.conf.YarnConfiguration
3030
import org.apache.hadoop.yarn.util.ConverterUtils
3131
import org.scalatest.concurrent.Eventually._
32+
import org.scalatest.exceptions.TestFailedException
3233
import org.scalatest.matchers.must.Matchers
3334
import org.scalatest.matchers.should.Matchers._
3435

@@ -375,17 +376,56 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
375376
emptyIvySettings
376377
}
377378

378-
test("SPARK-34472: non-local ivySettings file should be localized on driver in cluster mode") {
379+
test("SPARK-34472: ivySettings file with no scheme or file:// scheme should be " +
380+
"localized on driver in cluster mode") {
379381
val emptyIvySettings = createEmptyIvySettingsFile
382+
// For file:// URIs or URIs without scheme, make sure that ivySettings conf was changed
383+
// to the localized file. So the expected ivySettings path on the driver will start with
384+
// the file name and then some random UUID suffix
385+
testIvySettingsDistribution(clientMode = false, emptyIvySettings.getAbsolutePath,
386+
emptyIvySettings.getName, prefixMatch = true)
387+
testIvySettingsDistribution(clientMode = false, s"file://${emptyIvySettings.getAbsolutePath}",
388+
emptyIvySettings.getName, prefixMatch = true)
389+
}
390+
391+
test("SPARK-34472: ivySettings file with no scheme or file:// scheme should retain " +
392+
"user provided path in client mode") {
393+
val emptyIvySettings = createEmptyIvySettingsFile
394+
// In client mode, the file is present locally on the driver and so does not need to be
395+
// distributed. So the user provided path should be kept as is.
396+
testIvySettingsDistribution(clientMode = true, emptyIvySettings.getAbsolutePath,
397+
emptyIvySettings.getAbsolutePath)
398+
testIvySettingsDistribution(clientMode = true, s"file://${emptyIvySettings.getAbsolutePath}",
399+
s"file://${emptyIvySettings.getAbsolutePath}")
400+
}
401+
402+
test("SPARK-34472: ivySettings file with non-file:// schemes should throw an error") {
403+
val emptyIvySettings = createEmptyIvySettingsFile
404+
val e1 = intercept[TestFailedException] {
405+
testIvySettingsDistribution(clientMode = false,
406+
s"local://${emptyIvySettings.getAbsolutePath}", "")
407+
}
408+
assert(e1.getMessage.contains("IllegalArgumentException: " +
409+
"Scheme local not supported in spark.jars.ivySettings"))
410+
val e2 = intercept[TestFailedException] {
411+
testIvySettingsDistribution(clientMode = false,
412+
s"hdfs://${emptyIvySettings.getAbsolutePath}", "")
413+
}
414+
assert(e2.getMessage.contains("IllegalArgumentException: " +
415+
"Scheme hdfs not supported in spark.jars.ivySettings"))
416+
}
417+
418+
def testIvySettingsDistribution(clientMode: Boolean, ivySettingsPath: String,
419+
expectedIvySettingsPrefixOnDriver: String, prefixMatch: Boolean = false): Unit = {
380420
val result = File.createTempFile("result", null, tempDir)
381-
val finalState = runSpark(clientMode = false,
421+
val outFile = File.createTempFile("out", null, tempDir)
422+
val finalState = runSpark(clientMode = clientMode,
382423
mainClassName(YarnAddJarTest.getClass),
383-
// For non-local URIs, make sure that ivySettings conf was changed to the localized file
384-
// So the expected ivySettings path on the driver will start with the file name and then
385-
// some random UUID suffix
386-
appArgs = Seq(result.getAbsolutePath, emptyIvySettings.getName),
387-
extraConf = Map("spark.jars.ivySettings" -> emptyIvySettings.getAbsolutePath))
388-
checkResult(finalState, result)
424+
appArgs = Seq(result.getAbsolutePath, expectedIvySettingsPrefixOnDriver,
425+
prefixMatch.toString),
426+
extraConf = Map("spark.jars.ivySettings" -> ivySettingsPath),
427+
outFile = Option(outFile))
428+
checkResult(finalState, result, outFile = Option(outFile))
389429
}
390430
}
391431

@@ -604,26 +644,32 @@ private object YarnClasspathTest extends Logging {
604644

605645
private object YarnAddJarTest extends Logging {
606646
def main(args: Array[String]): Unit = {
607-
if (args.length != 2) {
647+
if (args.length != 3) {
608648
// scalastyle:off println
609649
System.err.println(
610650
s"""
611651
|Invalid command line: ${args.mkString(" ")}
612652
|
613-
|Usage: YarnAddJarTest [result file] [expected ivy settings prefix]
653+
|Usage: YarnAddJarTest [result file] [expected ivy settings path] [prefix match]
614654
""".stripMargin)
615655
// scalastyle:on println
616656
System.exit(1)
617657
}
618658

619659
val resultPath = args(0)
620-
val expectedIvySettingsPrefix = args(1)
660+
val expectedIvySettingsPath = args(1)
661+
val prefixMatch = args(2).toBoolean
621662
val sc = new SparkContext(new SparkConf())
622663

623664
var result = "failure"
624665
try {
625666
val settingsFile = sc.getConf.get("spark.jars.ivySettings")
626-
assert(settingsFile.startsWith(expectedIvySettingsPrefix))
667+
if (prefixMatch) {
668+
assert(settingsFile !== expectedIvySettingsPath)
669+
assert(settingsFile.startsWith(expectedIvySettingsPath))
670+
} else {
671+
assert(settingsFile === expectedIvySettingsPath)
672+
}
627673

628674
val caught = intercept[RuntimeException] {
629675
sc.addJar("ivy://org.fake-project.test:test:1.0.0")

0 commit comments

Comments
 (0)