Skip to content

Commit d354161

Browse files
committed
Provides Spark and Hive version in HiveThriftServer2 for branch-1.1
1 parent 0c2a244 commit d354161

File tree

7 files changed

+182
-150
lines changed

7 files changed

+182
-150
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ package org.apache.spark.util
2020
import java.io._
2121
import java.net._
2222
import java.nio.ByteBuffer
23-
import java.util.{Properties, Locale, Random, UUID}
24-
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
25-
26-
import org.apache.log4j.PropertyConfigurator
23+
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
24+
import java.util.{Locale, Properties, Random, UUID}
2725

2826
import scala.collection.JavaConversions._
2927
import scala.collection.Map
@@ -37,9 +35,10 @@ import com.google.common.io.{ByteStreams, Files}
3735
import com.google.common.util.concurrent.ThreadFactoryBuilder
3836
import org.apache.commons.lang3.SystemUtils
3937
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
38+
import org.apache.log4j.PropertyConfigurator
4039
import org.eclipse.jetty.util.MultiException
4140
import org.json4s._
42-
import tachyon.client.{TachyonFile,TachyonFS}
41+
import tachyon.client.{TachyonFS, TachyonFile}
4342

4443
import org.apache.spark._
4544
import org.apache.spark.deploy.SparkHadoopUtil
@@ -952,8 +951,8 @@ private[spark] object Utils extends Logging {
952951
*/
953952
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
954953
val trace = Thread.currentThread.getStackTrace()
955-
.filterNot { ste:StackTraceElement =>
956-
// When running under some profilers, the current stack trace might contain some bogus
954+
.filterNot { ste:StackTraceElement =>
955+
// When running under some profilers, the current stack trace might contain some bogus
957956
// frames. This is intended to ensure that we don't crash in these situations by
958957
// ignoring any frames that we can't examine.
959958
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
@@ -1354,7 +1353,7 @@ private[spark] object Utils extends Logging {
13541353
}
13551354
}
13561355

1357-
/**
1356+
/**
13581357
* Execute the given block, logging and re-throwing any uncaught exception.
13591358
* This is particularly useful for wrapping code that runs in a thread, to ensure
13601359
* that exceptions are printed, and to avoid having to catch Throwable.
@@ -1551,7 +1550,6 @@ private[spark] object Utils extends Logging {
15511550
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
15521551
PropertyConfigurator.configure(pro)
15531552
}
1554-
15551553
}
15561554

15571555
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,35 @@ case class SetCommand(
4848
extends LeafNode with Command with Logging {
4949

5050
override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
51-
// Set value for key k.
52-
case (Some(k), Some(v)) =>
53-
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
54-
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
51+
// Configures the deprecated "mapred.reduce.tasks" property.
52+
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), Some(v)) =>
53+
logWarning(
54+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
5555
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
56-
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
57-
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
58-
} else {
59-
context.setConf(k, v)
60-
Array(s"$k=$v")
61-
}
62-
63-
// Query the value bound to key k.
64-
case (Some(k), _) =>
65-
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
66-
// Should remove this once we get the ODBC driver updated.
67-
if (k == "-v") {
68-
val hiveJars = Seq(
69-
"hive-exec-0.12.0.jar",
70-
"hive-service-0.12.0.jar",
71-
"hive-common-0.12.0.jar",
72-
"hive-hwi-0.12.0.jar",
73-
"hive-0.12.0.jar").mkString(":")
74-
75-
Array(
76-
"system:java.class.path=" + hiveJars,
77-
"system:sun.java.command=shark.SharkServer2")
78-
}
79-
else {
80-
Array(s"$k=${context.getConf(k, "<undefined>")}")
81-
}
82-
83-
// Query all key-value pairs that are set in the SQLConf of the context.
84-
case (None, None) =>
85-
context.getAllConfs.map { case (k, v) =>
86-
s"$k=$v"
87-
}.toSeq
56+
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
57+
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
58+
59+
// Configures a single property.
60+
case (Some(k), Some(v)) =>
61+
context.setConf(k, v)
62+
Seq(s"$k=$v")
63+
64+
// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
65+
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
66+
// while "SET -v" returns all properties.)
67+
case (Some("-v") | None, None) =>
68+
context.getAllConfs.map { case (k, v) => s"$k=$v" }.toSeq
69+
70+
// Queries the deprecated "mapred.reduce.tasks" property.
71+
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), None) =>
72+
logWarning(
73+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
74+
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
75+
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")
76+
77+
// Queries a single property.
78+
case (Some(k), None) =>
79+
Seq(s"$k=${context.getConf(k, "<undefined>")}")
8880

8981
case _ =>
9082
throw new IllegalArgumentException()

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import scala.collection.JavaConversions._
21-
2220
import java.io.IOException
2321
import java.util.{List => JList}
2422
import javax.security.auth.login.LoginException
2523

24+
import scala.collection.JavaConversions._
25+
2626
import org.apache.commons.logging.Log
2727
import org.apache.hadoop.hive.conf.HiveConf
2828
import org.apache.hadoop.hive.shims.ShimLoader
2929
import org.apache.hive.service.Service.STATE
3030
import org.apache.hive.service.auth.HiveAuthFactory
31-
import org.apache.hive.service.cli.CLIService
31+
import org.apache.hive.service.cli._
3232
import org.apache.hive.service.{AbstractService, Service, ServiceException}
3333

3434
import org.apache.spark.sql.hive.HiveContext
@@ -57,6 +57,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
5757

5858
initCompositeService(hiveConf)
5959
}
60+
61+
override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
62+
getInfoType match {
63+
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
64+
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
65+
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version)
66+
case _ => super.getInfo(sessionHandle, getInfoType)
67+
}
68+
}
6069
}
6170

6271
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20+
import scala.collection.JavaConversions._
21+
2022
import org.apache.spark.scheduler.StatsReportListener
2123
import org.apache.spark.sql.hive.HiveContext
2224
import org.apache.spark.{Logging, SparkConf, SparkContext}
23-
import scala.collection.JavaConversions._
2425

2526
/** A singleton object for the master program. The slaves should not access this. */
2627
private[hive] object SparkSQLEnv extends Logging {
@@ -31,8 +32,10 @@ private[hive] object SparkSQLEnv extends Logging {
3132

3233
def init() {
3334
if (hiveContext == null) {
34-
sparkContext = new SparkContext(new SparkConf()
35-
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
35+
val sparkConf = new SparkConf()
36+
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
37+
.set("spark.sql.hive.version", "0.12.0")
38+
sparkContext = new SparkContext(sparkConf)
3639

3740
sparkContext.addSparkListener(new StatsReportListener())
3841
hiveContext = new HiveContext(sparkContext)

0 commit comments

Comments
 (0)