Skip to content

Commit 045865d

Browse files
committed
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
1 parent 4450f61 commit 045865d

File tree

7 files changed

+44
-49
lines changed

7 files changed

+44
-49
lines changed

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.SQLContext
2222
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
2323
import org.apache.spark.sql.execution.CacheManager
2424
import org.apache.spark.sql.execution.ui.SQLListener
25+
import org.apache.spark.util.MutableURLClassLoader
2526

2627

2728
/**
@@ -44,4 +45,21 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
4445
*/
4546
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
4647

48+
/**
49+
* A classloader used to load all user-added jar.
50+
*/
51+
val jarClassLoader = new NonClosableMutableURLClassLoader(
52+
org.apache.spark.util.Utils.getContextOrSparkClassLoader)
53+
54+
}
55+
56+
57+
/**
58+
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
59+
* This class loader cannot be closed (its `close` method is a no-op).
60+
*/
61+
private[sql] class NonClosableMutableURLClassLoader(parent: ClassLoader)
62+
extends MutableURLClassLoader(Array.empty, parent) {
63+
64+
override def close(): Unit = {}
4765
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ object HiveThriftServer2 extends Logging {
5858
@DeveloperApi
5959
def startWithContext(sqlContext: SQLContext): Unit = {
6060
val server = new HiveThriftServer2(sqlContext)
61-
server.init(SparkSQLEnv.sqlContext.sharedState.asInstanceOf[HiveSharedState].executionHive.conf)
61+
62+
val executionHive = HiveUtils.newClientForExecution(
63+
sqlContext.sparkContext.conf,
64+
sqlContext.sessionState.newHadoopConf())
65+
66+
server.init(executionHive.conf)
6267
server.start()
6368
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
6469
sqlContext.sparkContext.addSparkListener(listener)

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
3535
import org.apache.spark.internal.Logging
3636
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
3737
import org.apache.spark.sql.execution.command.SetCommand
38-
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
38+
import org.apache.spark.sql.hive.HiveUtils
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.types._
4141
import org.apache.spark.util.{Utils => SparkUtils}
@@ -195,9 +195,8 @@ private[hive] class SparkExecuteStatementOperation(
195195
statementId = UUID.randomUUID().toString
196196
logInfo(s"Running query '$statement' with $statementId")
197197
setState(OperationState.RUNNING)
198-
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
199198
// Always use the latest class loader provided by executionHive's state.
200-
val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader
199+
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
201200
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
202201

203202
HiveThriftServer2.listener.onStatementStart(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import java.util.regex.Pattern
21-
2220
import org.apache.hadoop.hive.conf.HiveConf
2321
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2422

2523
import org.apache.spark.sql._
2624
import org.apache.spark.sql.catalyst.analysis.Analyzer
2725
import org.apache.spark.sql.execution.SparkPlanner
2826
import org.apache.spark.sql.execution.datasources._
29-
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
27+
import org.apache.spark.sql.hive.client.HiveClient
3028
import org.apache.spark.sql.internal.SessionState
3129

3230

@@ -42,11 +40,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
4240
sparkSession.sharedState.asInstanceOf[HiveSharedState]
4341
}
4442

45-
/**
46-
* A Hive client used for execution.
47-
*/
48-
lazy val executionHive: HiveClientImpl = sharedState.executionHive.newSession()
49-
5043
/**
5144
* A Hive client used for interacting with the metastore.
5245
*/
@@ -61,9 +54,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
6154
* set in the SQLConf *as well as* in the HiveConf.
6255
*/
6356
lazy val hiveconf: HiveConf = {
64-
val c = executionHive.conf
65-
conf.setConf(c.getAllProperties)
66-
c
57+
val initialConf = new HiveConf(
58+
sparkSession.sparkContext.hadoopConfiguration,
59+
classOf[org.apache.hadoop.hive.ql.session.SessionState])
60+
61+
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
62+
// the initial value will be the current thread's context class loader
63+
// (i.e. initClassLoader at here).
64+
// We call initialConf.setClassLoader(initClassLoader) at here to make
65+
// this action explicit.
66+
initialConf.setClassLoader(sparkSession.sharedState.jarClassLoader)
67+
sparkSession.sparkContext.conf.getAll.foreach { case (k, v) =>
68+
initialConf.set(k, v)
69+
}
70+
initialConf
6771
}
6872

6973
setDefaultOverrideConfs()
@@ -140,33 +144,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
140144

141145
override def setConf(key: String, value: String): Unit = {
142146
super.setConf(key, value)
143-
executionHive.runSqlHive(s"SET $key=$value")
144147
metadataHive.runSqlHive(s"SET $key=$value")
145148
hiveconf.set(key, value)
146149
}
147150

148151
override def addJar(path: String): Unit = {
149-
super.addJar(path)
150-
executionHive.addJar(path)
151152
metadataHive.addJar(path)
152-
Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader)
153+
super.addJar(path)
153154
}
154155

155156
/**
156157
* Execute a SQL statement by passing the query text directly to Hive.
157158
*/
158159
override def runNativeSql(sql: String): Seq[String] = {
159-
val command = sql.trim.toLowerCase
160-
val functionOrMacroDDLPattern = Pattern.compile(
161-
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL)
162-
if (functionOrMacroDDLPattern.matcher(command).matches()) {
163-
executionHive.runSqlHive(sql)
164-
} else if (command.startsWith("set")) {
165-
metadataHive.runSqlHive(sql)
166-
executionHive.runSqlHive(sql)
167-
} else {
168-
metadataHive.runSqlHive(sql)
169-
}
160+
metadataHive.runSqlHive(sql)
170161
}
171162

172163
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
3131

3232
// TODO: just share the IsolatedClientLoader instead of the client instances themselves
3333

34-
/**
35-
* A Hive client used for execution.
36-
*/
37-
val executionHive: HiveClientImpl = {
38-
HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
39-
}
40-
4134
/**
4235
* A Hive client used to interact with the metastore.
4336
*/

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.deploy.SparkSubmitUtils
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.sql.catalyst.util.quietly
3535
import org.apache.spark.sql.hive.HiveUtils
36+
import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
3637
import org.apache.spark.util.{MutableURLClassLoader, Utils}
3738

3839
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
@@ -278,14 +279,3 @@ private[hive] class IsolatedClientLoader(
278279
*/
279280
private[hive] var cachedHive: Any = null
280281
}
281-
282-
/**
283-
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
284-
* This class loader cannot be closed (its `close` method is a no-op).
285-
*/
286-
private[sql] class NonClosableMutableURLClassLoader(
287-
parent: ClassLoader)
288-
extends MutableURLClassLoader(Array.empty, parent) {
289-
290-
override def close(): Unit = {}
291-
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ private[hive] class TestHiveSparkSession(
426426
sessionState.hiveconf.set("fs.default.name", new File(".").toURI.toString)
427427
// It is important that we RESET first as broken hooks that might have been set could break
428428
// other sql exec here.
429-
sessionState.executionHive.runSqlHive("RESET")
430429
sessionState.metadataHive.runSqlHive("RESET")
431430
// For some reason, RESET does not reset the following variables...
432431
// https://issues.apache.org/jira/browse/HIVE-9004

0 commit comments

Comments
 (0)