@@ -69,13 +69,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6969 val table = synchronized {
7070 client.getTable(in.database, in.name)
7171 }
72- val schemaString = table.getProperty(" spark.sql.sources.schema" )
73- val userSpecifiedSchema =
74- if (schemaString == null ) {
75- None
76- } else {
77- Some (DataType .fromJson(schemaString).asInstanceOf [StructType ])
72+ val schemaString = Option (table.getProperty(" spark.sql.sources.schema" ))
73+ .orElse {
74+ // If spark.sql.sources.schema is not defined, we either splitted the schema to multiple
75+ // parts or the schema was not defined. To determine if the schema was defined,
76+ // we check spark.sql.sources.schema.numOfParts.
77+ Option (table.getProperty(" spark.sql.sources.schema.numOfParts" )) match {
78+ case Some (numOfParts) =>
79+ val parts = (0 until numOfParts.toInt).map { index =>
80+ Option (table.getProperty(s " spark.sql.sources.schema.part. ${index}" ))
81+ .getOrElse(" Could not read schema from the metastore because it is corrupted." )
82+ }
83+ // Stick all parts back to a single schema string in the JSON representation.
84+ Some (parts.mkString)
85+ case None => None // The schema was not defined.
86+ }
7887 }
88+
89+ val userSpecifiedSchema =
90+ schemaString.flatMap(s => Some (DataType .fromJson(s).asInstanceOf [StructType ]))
7991 // It does not appear that the ql client for the metastore has a way to enumerate all the
8092 // SerDe properties directly...
8193 val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
@@ -119,7 +131,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
119131
120132 tbl.setProperty(" spark.sql.sources.provider" , provider)
121133 if (userSpecifiedSchema.isDefined) {
122- tbl.setProperty(" spark.sql.sources.schema" , userSpecifiedSchema.get.json)
134+ val threshold = hive.conf.schemaStringLengthThreshold
135+ val schemaJsonString = userSpecifiedSchema.get.json
136+ // Check if the size of the JSON string of the schema exceeds the threshold.
137+ if (schemaJsonString.size > threshold) {
138+ // Need to split the string.
139+ val parts = schemaJsonString.grouped(threshold).toSeq
140+ // First, record the total number of parts we have.
141+ tbl.setProperty(" spark.sql.sources.schema.numOfParts" , parts.size.toString)
142+ // Second, write every part to table property.
143+ parts.zipWithIndex.foreach {
144+ case (part, index) =>
145+ tbl.setProperty(s " spark.sql.sources.schema.part. ${index}" , part)
146+ }
147+ } else {
148+ // The length is less than the threshold, just put it in the table property.
149+ tbl.setProperty(" spark.sql.sources.schema.numOfParts" , " 1" )
150+ // We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0
151+ // because users may have already created data source tables in metastore.
152+ tbl.setProperty(" spark.sql.sources.schema" , schemaJsonString)
153+ }
123154 }
124155 options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
125156
0 commit comments