Skip to content

Commit 12bacae

Browse files
committed
If the JSON string of a schema is too large, split it before storing it in metastore.
1 parent e9b4f70 commit 12bacae

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ private[spark] object SQLConf {
5151

5252
// This is used to set the default data source
5353
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
54+
// This is used to control the when we will split a schema's JSON string to multiple pieces
55+
// in order to fit the JSON string in metastore's table property (by default, the value has
56+
// a length restriction of 4000 characters). We will split the JSON string of a schema
57+
// to its length exceeds the threshold.
58+
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
5459

5560
// Whether to perform eager analysis when constructing a dataframe.
5661
// Set to false when debugging requires the ability to look at invalid query plans.
@@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable {
177182
private[spark] def defaultDataSourceName: String =
178183
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
179184

185+
// Do not use a value larger than 4000 as the default value of this property.
186+
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
187+
private[spark] def schemaStringLengthThreshold: Int =
188+
getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
189+
180190
private[spark] def dataFrameEagerAnalysis: Boolean =
181191
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
182192

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6969
val table = synchronized {
7070
client.getTable(in.database, in.name)
7171
}
72-
val schemaString = table.getProperty("spark.sql.sources.schema")
73-
val userSpecifiedSchema =
74-
if (schemaString == null) {
75-
None
76-
} else {
77-
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
72+
val schemaString = Option(table.getProperty("spark.sql.sources.schema"))
73+
.orElse {
74+
// If spark.sql.sources.schema is not defined, we either splitted the schema to multiple
75+
// parts or the schema was not defined. To determine if the schema was defined,
76+
// we check spark.sql.sources.schema.numOfParts.
77+
Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
78+
case Some(numOfParts) =>
79+
val parts = (0 until numOfParts.toInt).map { index =>
80+
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
81+
.getOrElse("Could not read schema from the metastore because it is corrupted.")
82+
}
83+
// Stick all parts back to a single schema string in the JSON representation.
84+
Some(parts.mkString)
85+
case None => None // The schema was not defined.
86+
}
7887
}
88+
89+
val userSpecifiedSchema =
90+
schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType]))
7991
// It does not appear that the ql client for the metastore has a way to enumerate all the
8092
// SerDe properties directly...
8193
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
@@ -119,7 +131,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
119131

120132
tbl.setProperty("spark.sql.sources.provider", provider)
121133
if (userSpecifiedSchema.isDefined) {
122-
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
134+
val threshold = hive.conf.schemaStringLengthThreshold
135+
val schemaJsonString = userSpecifiedSchema.get.json
136+
// Check if the size of the JSON string of the schema exceeds the threshold.
137+
if (schemaJsonString.size > threshold) {
138+
// Need to split the string.
139+
val parts = schemaJsonString.grouped(threshold).toSeq
140+
// First, record the total number of parts we have.
141+
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
142+
// Second, write every part to table property.
143+
parts.zipWithIndex.foreach {
144+
case (part, index) =>
145+
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
146+
}
147+
} else {
148+
// The length is less than the threshold, just put it in the table property.
149+
tbl.setProperty("spark.sql.sources.schema.numOfParts", "1")
150+
// We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0
151+
// because users may have already created data source tables in metastore.
152+
tbl.setProperty("spark.sql.sources.schema", schemaJsonString)
153+
}
123154
}
124155
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
125156

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
606606
options = Map("path" -> "just a dummy path"),
607607
isExternal = false)
608608

609+
invalidateTable("wide_schema")
610+
609611
val actualSchema = table("wide_schema").schema
610612
assert(schema === actualSchema)
611613
}

0 commit comments

Comments
 (0)