Skip to content

Commit 59fc4e9

Browse files
committed
introduce global SQL conf
1 parent 2190037 commit 59fc4e9

File tree

15 files changed

+98
-66
lines changed

15 files changed

+98
-66
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,6 @@ package object config {
9191
.toSequence
9292
.createWithDefault(Nil)
9393

94-
// Note: This is a SQL config but needs to be in core because the REPL depends on it
95-
private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")
96-
.internal()
97-
.stringConf
98-
.checkValues(Set("hive", "in-memory"))
99-
.createWithDefault("in-memory")
100-
10194
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
10295
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
10396
.intConf

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import java.io.File
2222
import scala.tools.nsc.GenericRunnerSettings
2323

2424
import org.apache.spark._
25-
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
2625
import org.apache.spark.internal.Logging
2726
import org.apache.spark.sql.SparkSession
27+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
2828
import org.apache.spark.util.Utils
2929

3030
object Main extends Logging {

repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import scala.collection.mutable.ArrayBuffer
2424
import org.apache.commons.lang3.StringEscapeUtils
2525
import org.apache.log4j.{Level, LogManager}
2626
import org.apache.spark.{SparkContext, SparkFunSuite}
27-
import org.apache.spark.internal.config._
2827
import org.apache.spark.sql.SparkSession
28+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
2929
import org.apache.spark.util.Utils
3030

3131
class ReplSuite extends SparkFunSuite {

sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql
1919

2020
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
21+
import org.apache.spark.sql.internal.GlobalSQLConf
2122
import org.apache.spark.sql.internal.SQLConf
2223

2324

@@ -36,6 +37,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
3637
* @since 2.0.0
3738
*/
3839
def set(key: String, value: String): Unit = {
40+
assertNotGlobalSQLConf(key)
3941
sqlConf.setConfString(key, value)
4042
}
4143

@@ -122,6 +124,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
122124
* @since 2.0.0
123125
*/
124126
def unset(key: String): Unit = {
127+
assertNotGlobalSQLConf(key)
125128
sqlConf.unsetConf(key)
126129
}
127130

@@ -132,4 +135,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
132135
sqlConf.contains(key)
133136
}
134137

138+
private def assertNotGlobalSQLConf(key: String): Unit = {
139+
if (GlobalSQLConf.globalConfKeys.contains(key)) {
140+
throw new AnalysisException(s"Can not set/unset a global SQL conf: $key")
141+
}
142+
}
135143
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
2929
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3030
import org.apache.spark.api.java.JavaRDD
3131
import org.apache.spark.internal.Logging
32-
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
3332
import org.apache.spark.rdd.RDD
3433
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
3534
import org.apache.spark.sql.catalog.Catalog
@@ -41,6 +40,7 @@ import org.apache.spark.sql.execution._
4140
import org.apache.spark.sql.execution.datasources.LogicalRelation
4241
import org.apache.spark.sql.execution.ui.SQLListener
4342
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
43+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
4444
import org.apache.spark.sql.sources.BaseRelation
4545
import org.apache.spark.sql.streaming._
4646
import org.apache.spark.sql.types.{DataType, LongType, StructType}
@@ -791,7 +791,7 @@ object SparkSession {
791791
// Get the session from current thread's active session.
792792
var session = activeThreadSession.get()
793793
if ((session ne null) && !session.sparkContext.isStopped) {
794-
options.foreach { case (k, v) => session.conf.set(k, v) }
794+
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
795795
if (options.nonEmpty) {
796796
logWarning("Use an existing SparkSession, some configuration may not take effect.")
797797
}
@@ -803,7 +803,7 @@ object SparkSession {
803803
// If the current thread does not have an active session, get it from the global session.
804804
session = defaultSession.get()
805805
if ((session ne null) && !session.sparkContext.isStopped) {
806-
options.foreach { case (k, v) => session.conf.set(k, v) }
806+
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
807807
if (options.nonEmpty) {
808808
logWarning("Use an existing SparkSession, some configuration may not take effect.")
809809
}
@@ -829,7 +829,7 @@ object SparkSession {
829829
sc
830830
}
831831
session = new SparkSession(sparkContext)
832-
options.foreach { case (k, v) => session.conf.set(k, v) }
832+
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
833833
defaultSession.set(session)
834834

835835
// Register a successfully instantiated context to the singleton. This should be at the

sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import org.apache.spark.SparkContext
2828
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2929
import org.apache.spark.api.r.SerDe
3030
import org.apache.spark.broadcast.Broadcast
31-
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
3231
import org.apache.spark.rdd.RDD
3332
import org.apache.spark.sql._
3433
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
3534
import org.apache.spark.sql.execution.command.ShowTablesCommand
35+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
3636
import org.apache.spark.sql.types._
3737

3838
private[sql] object SQLUtils extends Logging {

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object SQLConf {
4141
private val sqlConfEntries = java.util.Collections.synchronizedMap(
4242
new java.util.HashMap[String, ConfigEntry[_]]())
4343

44-
private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
44+
private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
4545
require(!sqlConfEntries.containsKey(entry.key),
4646
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
4747
sqlConfEntries.put(entry.key, entry)
@@ -326,18 +326,6 @@ object SQLConf {
326326
.booleanConf
327327
.createWithDefault(true)
328328

329-
// This is used to control the when we will split a schema's JSON string to multiple pieces
330-
// in order to fit the JSON string in metastore's table property (by default, the value has
331-
// a length restriction of 4000 characters). We will split the JSON string of a schema
332-
// to its length exceeds the threshold.
333-
val SCHEMA_STRING_LENGTH_THRESHOLD =
334-
SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
335-
.doc("The maximum length allowed in a single cell when " +
336-
"storing additional schema information in Hive's metastore.")
337-
.internal()
338-
.intConf
339-
.createWithDefault(4000)
340-
341329
val PARTITION_COLUMN_TYPE_INFERENCE =
342330
SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
343331
.doc("When true, automatically infer the data types for partitioned columns.")
@@ -729,10 +717,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
729717

730718
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
731719

732-
// Do not use a value larger than 4000 as the default value of this property.
733-
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
734-
def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
735-
736720
def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS)
737721

738722
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
@@ -877,3 +861,36 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
877861
}
878862
}
879863

864+
/**
865+
* Global SQL configuration is a cross-session, immutable Spark configuration. External users can
866+
* see the global sql configs via `SparkSession.conf`, but can NOT set/unset them.
867+
*/
868+
object GlobalSQLConf {
869+
val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
870+
871+
private def buildConf(key: String): ConfigBuilder = {
872+
ConfigBuilder(key).onCreate { entry =>
873+
globalConfKeys.add(entry.key)
874+
SQLConf.register(entry)
875+
}
876+
}
877+
878+
val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
879+
.internal()
880+
.stringConf
881+
.checkValues(Set("hive", "in-memory"))
882+
.createWithDefault("in-memory")
883+
884+
// This is used to control the when we will split a schema's JSON string to multiple pieces
885+
// in order to fit the JSON string in metastore's table property (by default, the value has
886+
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
887+
// value of this property). We will split the JSON string of a schema to its length exceeds the
888+
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
889+
// that's why this conf has to be a global SQL conf.
890+
val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
891+
.doc("The maximum length allowed in a single cell when " +
892+
"storing additional schema information in Hive's metastore.")
893+
.internal()
894+
.intConf
895+
.createWithDefault(4000)
896+
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import scala.util.control.NonFatal
2323
import org.apache.hadoop.conf.Configuration
2424

2525
import org.apache.spark.{SparkConf, SparkContext}
26-
import org.apache.spark.internal.config._
2726
import org.apache.spark.internal.Logging
2827
import org.apache.spark.sql.{SparkSession, SQLContext}
2928
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
3029
import org.apache.spark.sql.execution.CacheManager
3130
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
31+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
3232
import org.apache.spark.util.{MutableURLClassLoader, Utils}
3333

3434

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import java.io.File
2222
import org.apache.hadoop.fs.Path
2323
import org.scalatest.BeforeAndAfterEach
2424

25-
import org.apache.spark.internal.config._
2625
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
2726
import org.apache.spark.sql.catalyst.TableIdentifier
2827
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
2928
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
3029
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
3130
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
3231
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
32+
import org.apache.spark.sql.internal.GlobalSQLConf.CATALOG_IMPLEMENTATION
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.test.SharedSQLContext
3535
import org.apache.spark.sql.types._

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ package org.apache.spark.sql.internal
1919

2020
import org.apache.hadoop.fs.Path
2121

22-
import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
22+
import org.apache.spark.sql._
2323
import org.apache.spark.sql.execution.WholeStageCodegenExec
24+
import org.apache.spark.sql.internal.GlobalSQLConf._
2425
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
2526

2627
class SQLConfSuite extends QueryTest with SharedSQLContext {
28+
import testImplicits._
29+
2730
private val testKey = "test.key.0"
2831
private val testVal = "test.val.0"
2932

@@ -250,4 +253,20 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
250253
}
251254
}
252255
}
256+
257+
test("global SQL conf comes from SparkConf") {
258+
val newSession = SparkSession.builder()
259+
.config(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")
260+
.getOrCreate()
261+
262+
assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD.key) == "2000")
263+
checkAnswer(
264+
newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
265+
Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
266+
}
267+
268+
test("cannot set/unset global SQL conf") {
269+
intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
270+
intercept[AnalysisException](sql(s"UNSET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
271+
}
253272
}

0 commit comments

Comments
 (0)