Skip to content

Commit c7fccb5

Browse files
author
Marcelo Vanzin
committed
[SPARK-13478][YARN] Use real user when fetching delegation tokens.
The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases. Author: Marcelo Vanzin <[email protected]> Closes apache#11358 from vanzin/SPARK-13478.
1 parent 4bd697d commit c7fccb5

File tree

3 files changed

+41
-12
lines changed

3 files changed

+41
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
255255
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
256256
}
257257
}
258+
259+
if (proxyUser != null && principal != null) {
260+
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
261+
}
258262
}
259263

260264
private def validateKillArguments(): Unit = {
@@ -517,6 +521,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
517521
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
518522
|
519523
| --proxy-user NAME User to impersonate when submitting the application.
524+
| This argument does not work with --principal / --keytab.
520525
|
521526
| --help, -h Show this help message and exit
522527
| --verbose, -v Print additional debug output

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
21+
import java.lang.reflect.UndeclaredThrowableException
2122
import java.nio.charset.StandardCharsets.UTF_8
23+
import java.security.PrivilegedExceptionAction
2224
import java.util.regex.Matcher
2325
import java.util.regex.Pattern
2426

@@ -194,7 +196,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
194196
*/
195197
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
196198
try {
197-
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
199+
obtainTokenForHiveMetastoreInner(conf)
198200
} catch {
199201
case e: ClassNotFoundException =>
200202
logInfo(s"Hive class not found $e")
@@ -209,8 +211,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
209211
* @param username the username of the principal requesting the delegating token.
210212
* @return a delegation token
211213
*/
212-
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
213-
username: String): Option[Token[DelegationTokenIdentifier]] = {
214+
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
215+
Option[Token[DelegationTokenIdentifier]] = {
214216
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
215217

216218
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -225,11 +227,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
225227

226228
// Check for local metastore
227229
if (metastoreUri.nonEmpty) {
228-
require(username.nonEmpty, "Username undefined")
229230
val principalKey = "hive.metastore.kerberos.principal"
230231
val principal = hiveConf.getTrimmed(principalKey, "")
231232
require(principal.nonEmpty, "Hive principal $principalKey undefined")
232-
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
233+
val currentUser = UserGroupInformation.getCurrentUser()
234+
logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
235+
s"$principal at $metastoreUri")
233236
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
234237
val closeCurrent = hiveClass.getMethod("closeCurrent")
235238
try {
@@ -238,12 +241,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
238241
classOf[String], classOf[String])
239242
val getHive = hiveClass.getMethod("get", hiveConfClass)
240243

241-
// invoke
242-
val hive = getHive.invoke(null, hiveConf)
243-
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
244-
val hive2Token = new Token[DelegationTokenIdentifier]()
245-
hive2Token.decodeFromUrlString(tokenStr)
246-
Some(hive2Token)
244+
doAsRealUser {
245+
val hive = getHive.invoke(null, hiveConf)
246+
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
247+
.asInstanceOf[String]
248+
val hive2Token = new Token[DelegationTokenIdentifier]()
249+
hive2Token.decodeFromUrlString(tokenStr)
250+
Some(hive2Token)
251+
}
247252
} finally {
248253
Utils.tryLogNonFatalError {
249254
closeCurrent.invoke(null)
@@ -303,6 +308,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
303308
}
304309
}
305310

311+
/**
312+
* Run some code as the real logged in user (which may differ from the current user, for
313+
* example, when using proxying).
314+
*/
315+
private def doAsRealUser[T](fn: => T): T = {
316+
val currentUser = UserGroupInformation.getCurrentUser()
317+
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
318+
319+
// For some reason the Scala-generated anonymous class ends up causing an
320+
// UndeclaredThrowableException, even if you annotate the method with @throws.
321+
try {
322+
realUser.doAs(new PrivilegedExceptionAction[T]() {
323+
override def run(): T = fn
324+
})
325+
} catch {
326+
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
327+
}
328+
}
329+
306330
}
307331

308332
object YarnSparkHadoopUtil {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
255255
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
256256
val util = new YarnSparkHadoopUtil
257257
assertNestedHiveException(intercept[InvocationTargetException] {
258-
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
258+
util.obtainTokenForHiveMetastoreInner(hadoopConf)
259259
})
260260
assertNestedHiveException(intercept[InvocationTargetException] {
261261
util.obtainTokenForHiveMetastore(hadoopConf)

0 commit comments

Comments
 (0)