Skip to content

Commit 2c2e118

Browse files
committed
[SPARK-1522]: YARN ClientBase throws a NPE if there is no YARN Application specific CP
The current implementation of ClientBase.getDefaultYarnApplicationClasspath inspects the MRJobConfig class for the field DEFAULT_YARN_APPLICATION_CLASSPATH when it should be really looking into YarnConfiguration. If the Application Configuration has no yarn.application.classpath defined a NPE exception will be thrown. Public API Changes =========================== * YARN ClientBase getDefault*ApplicationClasspath returns Option[Seq[String]] This commit depicts how the `ClientBase` API could change the `getDefaultYarnApplicationClasspath` and `getDefaultMRApplicationClasspath` to return a `Option[Seq[String]]` while recovering from `NoSuchFieldException`. Both methods that return the default application's *classpath*, for both *YARN* as well as *Map Reduce (MR)*, use reflection and per the Java API documentation they can throw the following exceptions: * Class:getField(String name): * NoSuchFieldException - if a field with the specified name is not found. * NullPointerException - if name is null * SecurityException - If a security manager, s, is present and any of the following conditions is met: 1. Invocation of s.checkMemberAccess(this, Member.PUBLIC) denies access to the field. 2. The caller's class loader is not the same as or an ancestor of the class loader for the current class and invocation of `s.checkPackageAccess()` denies access to the package of this class. * Field:Object get(Object obj): * IllegalAccessException - if this Field object is enforcing Java language access control and the underlying field is inaccessible. * IllegalArgumentException - if the specified object is not an instance of the class or interface declaring the underlying field (or a subclass or implementor thereof). * NullPointerException - if the specified object is null and the field is an instance field. * ExceptionInInitializerError - if the initialization provoked by this method fails. **NOTE**: The above is based on the *Java API for JDK 1.7* An interesting thing to notice is that the official JDK doesn't mention the occurrence of the `NoSuchFieldError`. This is completely acceptable per the JDK spec. The reason is that it is an *Error* and as described by the Java Language Specification and depicted in the *Error Class* documentation. An `Error` "indicates serious problems that a reasonable application should not try to catch." While An `Exception` "indicates conditions that a reasonable application might want to catch." If we actually dig deeper according to the *JVM SE7 Specification* "While Loading, Linking, and Initializing, if an error occurs during resolution of a symbolic reference, then an instance of IncompatibleClassChangeError (or a subclass) must be thrown..." "If an attempt by the Java Virtual Machine to resolve a symbolic reference fails because an error is thrown that is an instance of LinkageError (or a subclass), then subsequent attempts to resolve the reference always fail with the same error that was thrown as a result of the initial resolution attempt." Now `NoSuchFieldError` extends `LinkageError` which in turn is a `IncompatibleClassChangeError` and according to its documentation, the *LinkageError Class*, "indicates that a class has some dependency on another class; however, the latter class has incompatibly changed after the compilation of the former class." Why all these is important and how it relates with a couple of lines of code? Well, the original approach catches the two most probable problems you might encounter if you access, using reflection, a field that you are almost sure that if it exist it will be of _public_ access but you are not sure it will always be there. Interesting enough the original implementation addresses one of the _exceptions_ as well as a potential _linkage error_ but as mentioned neglects a documented _security exception_, probably due its unlikeliness to occur. Fact is that if an error _bubbles_ up the Spark YARN Client doesn't handle it will terminate, in a probably obscure fashion. The current call stack is as follows. Client >> run >> runApp >> ClientBase.setupLaunchEnv >> populateClasspath In my opinion it is questionable to let an exception escape of this context, the _ClientBase Object_. In my opinion such _ClientBase Object_ should fail gracefully by handling the potential _exceptions_ and _linkage error_ while providing enough logging to let a user know and identify what happened. Yet again, in my opinion the implementation in this commit handles it in a better, more resilient, manner than the previous implementation while adding logging that will help clarify the issues in case of an _exception_. Additional Changes include: =========================== * Test Suite for ClientBase added * Coding Style: * [Spark Style Guidelines](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) * [Scala Official Style Guidelines](http://docs.scala-lang.org/style/) * [Scalariform](https://github.com/mdr/scalariform) * Code refactoring and cleanup per review by andrewor14 Ref. "JVM SE7 Specification" http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4 "Java API for JDK 1.7" http://docs.oracle.com/javase/7/docs/api/ [ticket: SPARK-1522] : https://issues.apache.org/jira/browse/SPARK-1522 Author : berngp Reviewer : andrewor14, tgravescs Testing : ?
1 parent 8d85359 commit 2c2e118

File tree

2 files changed

+167
-34
lines changed

2 files changed

+167
-34
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
2323

2424
import scala.collection.JavaConversions._
2525
import scala.collection.mutable.{HashMap, ListBuffer, Map}
26+
import scala.util.{Try, Success, Failure}
2627

2728
import org.apache.hadoop.conf.Configuration
2829
import org.apache.hadoop.fs._
@@ -378,7 +379,7 @@ trait ClientBase extends Logging {
378379
}
379380
}
380381

381-
object ClientBase {
382+
object ClientBase extends Logging {
382383
val SPARK_JAR: String = "__spark__.jar"
383384
val APP_JAR: String = "__app__.jar"
384385
val LOG4J_PROP: String = "log4j.properties"
@@ -388,58 +389,78 @@ object ClientBase {
388389

389390
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
390391

391-
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
392-
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
393-
val classpathEntries = Option(conf.getStrings(
394-
YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse(
395-
getDefaultYarnApplicationClasspath())
396-
if (classpathEntries != null) {
397-
for (c <- classpathEntries) {
398-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
399-
File.pathSeparator)
400-
}
392+
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
393+
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
394+
for (c <- classPathElementsToAdd.flatten) {
395+
YarnSparkHadoopUtil.addToEnvironment(
396+
env,
397+
Environment.CLASSPATH.name,
398+
c.trim,
399+
File.pathSeparator)
401400
}
401+
classPathElementsToAdd
402+
}
402403

403-
val mrClasspathEntries = Option(conf.getStrings(
404-
"mapreduce.application.classpath")).getOrElse(
405-
getDefaultMRApplicationClasspath())
406-
if (mrClasspathEntries != null) {
407-
for (c <- mrClasspathEntries) {
408-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
409-
File.pathSeparator)
410-
}
411-
}
404+
private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
405+
Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
406+
case Some(s) => Some(s.toSeq)
407+
case None => getDefaultYarnApplicationClasspath
412408
}
413409

414-
def getDefaultYarnApplicationClasspath(): Array[String] = {
415-
try {
416-
val field = classOf[MRJobConfig].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
417-
field.get(null).asInstanceOf[Array[String]]
418-
} catch {
419-
case err: NoSuchFieldError => null
420-
case err: NoSuchFieldException => null
410+
private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
411+
Option(conf.getStrings("mapreduce.application.classpath")) match {
412+
case Some(s) => Some(s.toSeq)
413+
case None => getDefaultMRApplicationClasspath
414+
}
415+
416+
def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
417+
val triedDefault = Try[Seq[String]] {
418+
val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
419+
val value = field.get(null).asInstanceOf[Array[String]]
420+
value.toSeq
421+
} recoverWith {
422+
case e: NoSuchFieldException => Success(Seq.empty[String])
421423
}
424+
425+
triedDefault match {
426+
case f: Failure[_] =>
427+
logError("Unable to obtain the default YARN Application classpath.", f.exception)
428+
case s: Success[_] =>
429+
logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
430+
}
431+
432+
triedDefault.toOption
422433
}
423434

424435
/**
425436
* In Hadoop 0.23, the MR application classpath comes with the YARN application
426437
* classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
427438
* So we need to use reflection to retrieve it.
428439
*/
429-
def getDefaultMRApplicationClasspath(): Array[String] = {
430-
try {
440+
def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
441+
val triedDefault = Try[Seq[String]] {
431442
val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
432-
if (field.getType == classOf[String]) {
433-
StringUtils.getStrings(field.get(null).asInstanceOf[String])
443+
val value = if (field.getType == classOf[String]) {
444+
StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray
434445
} else {
435446
field.get(null).asInstanceOf[Array[String]]
436447
}
437-
} catch {
438-
case err: NoSuchFieldError => null
439-
case err: NoSuchFieldException => null
448+
value.toSeq
449+
} recoverWith {
450+
case e: NoSuchFieldException => Success(Seq.empty[String])
440451
}
452+
453+
triedDefault match {
454+
case f: Failure[_] =>
455+
logError("Unable to obtain the default MR Application classpath.", f.exception)
456+
case s: Success[_] =>
457+
logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
458+
}
459+
460+
triedDefault.toOption
441461
}
442462

463+
443464
/**
444465
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
445466
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import java.net.URI
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.mapreduce.MRJobConfig
24+
import org.apache.hadoop.yarn.conf.YarnConfiguration
25+
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
26+
27+
import org.scalatest.FunSuite
28+
import org.scalatest.matchers.ShouldMatchers._
29+
30+
import scala.collection.JavaConversions._
31+
import scala.collection.mutable.{ HashMap => MutableHashMap }
32+
import scala.util.Try
33+
34+
35+
class ClientBaseSuite extends FunSuite {
36+
37+
test("default Yarn application classpath") {
38+
ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
39+
}
40+
41+
test("default MR application classpath") {
42+
ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
43+
}
44+
45+
test("resultant classpath for an application that defines a classpath for YARN") {
46+
withAppConf(Fixtures.mapYARNAppConf) { conf =>
47+
val env = newEnv
48+
ClientBase.populateHadoopClasspath(conf, env)
49+
classpath(env) should be(
50+
flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
51+
}
52+
}
53+
54+
test("resultant classpath for an application that defines a classpath for MR") {
55+
withAppConf(Fixtures.mapMRAppConf) { conf =>
56+
val env = newEnv
57+
ClientBase.populateHadoopClasspath(conf, env)
58+
classpath(env) should be(
59+
flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
60+
}
61+
}
62+
63+
test("resultant classpath for an application that defines both classpaths, YARN and MR") {
64+
withAppConf(Fixtures.mapAppConf) { conf =>
65+
val env = newEnv
66+
ClientBase.populateHadoopClasspath(conf, env)
67+
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
68+
}
69+
}
70+
71+
object Fixtures {
72+
73+
val knownDefYarnAppCP: Seq[String] =
74+
getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
75+
"DEFAULT_YARN_APPLICATION_CLASSPATH",
76+
Seq[String]())(a => a.toSeq)
77+
78+
79+
val knownDefMRAppCP: Seq[String] =
80+
getFieldValue[String, Seq[String]](classOf[MRJobConfig],
81+
"DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
82+
Seq[String]())(a => a.split(","))
83+
84+
val knownYARNAppCP = Some(Seq("/known/yarn/path"))
85+
86+
val knownMRAppCP = Some(Seq("/known/mr/path"))
87+
88+
val mapMRAppConf =
89+
Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
90+
91+
val mapYARNAppConf =
92+
Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
93+
94+
val mapAppConf = mapYARNAppConf ++ mapMRAppConf
95+
}
96+
97+
def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
98+
val conf = new Configuration
99+
m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
100+
testCode(conf)
101+
}
102+
103+
def newEnv = MutableHashMap[String, String]()
104+
105+
def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
106+
107+
def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
108+
109+
def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
110+
Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
111+
112+
}

0 commit comments

Comments
 (0)