Skip to content

Commit 3f03c90

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-18220][SQL] read Hive orc table with varchar column should not fail
## What changes were proposed in this pull request? Spark SQL only has `StringType`, when reading hive table with varchar column, we will read that column as `StringType`. However, we still need to use varchar `ObjectInspector` to read varchar column in hive table, which means we need to know the actual column type at hive side. In Spark 2.1, after #14363 , we parse hive type string to catalyst type, which means the actual column type at hive side is erased. Then we may use string `ObjectInspector` to read varchar column and fail. This PR keeps the original hive column type string in the metadata of `StructField`, and use it when we convert it to a hive column. ## How was this patch tested? newly added regression test Author: Wenchen Fan <[email protected]> Closes #16060 from cloud-fan/varchar.
1 parent c24076d commit 3f03c90

File tree

5 files changed

+40
-6
lines changed

5 files changed

+40
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging {
5454
/** The version of hive used internally by Spark SQL. */
5555
val hiveExecutionVersion: String = "1.2.1"
5656

57+
/**
58+
* The property key that is used to store the raw hive type string in the metadata of StructField.
59+
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
60+
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
61+
* inspector in Hive.
62+
*/
63+
val hiveTypeString: String = "HIVE_TYPE_STRING"
64+
5765
val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
5866
.doc("Version of the Hive metastore. Available options are " +
5967
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")

sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
6161
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
6262

6363
private def toHiveColumn(c: StructField): FieldSchema = {
64-
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
64+
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
65+
c.metadata.getString(HiveUtils.hiveTypeString)
66+
} else {
67+
c.dataType.catalogString
68+
}
69+
new FieldSchema(c.name, typeString, c.getComment.orNull)
6570
}
6671

6772
// TODO: merge this with HiveClientImpl#toHiveTable

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
4646
import org.apache.spark.sql.catalyst.expressions.Expression
4747
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
4848
import org.apache.spark.sql.execution.QueryExecutionException
49-
import org.apache.spark.sql.types.{StructField, StructType}
49+
import org.apache.spark.sql.hive.HiveUtils
50+
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
5051
import org.apache.spark.util.{CircularBuffer, Utils}
5152

5253
/**
@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
748749
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
749750

750751
private def toHiveColumn(c: StructField): FieldSchema = {
751-
new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
752+
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
753+
c.metadata.getString(HiveUtils.hiveTypeString)
754+
} else {
755+
c.dataType.catalogString
756+
}
757+
new FieldSchema(c.name, typeString, c.getComment().orNull)
752758
}
753759

754760
private def fromHiveColumn(hc: FieldSchema): StructField = {
@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
758764
case e: ParseException =>
759765
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
760766
}
767+
768+
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
761769
val field = StructField(
762770
name = hc.getName,
763771
dataType = columnType,
764-
nullable = true)
772+
nullable = true,
773+
metadata = metadata)
765774
Option(hc.getComment).map(field.withComment).getOrElse(field)
766775
}
767776

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
205205
test("make sure we can read table created by old version of Spark") {
206206
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
207207
val readBack = getTableMetadata(tbl.identifier.table)
208-
assert(readBack.schema == expectedSchema)
208+
assert(readBack.schema.sameType(expectedSchema))
209209

210210
if (tbl.tableType == CatalogTableType.EXTERNAL) {
211211
// trim the URI prefix
@@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
235235
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
236236

237237
val readBack = getTableMetadata(newName)
238-
assert(readBack.schema == expectedSchema)
238+
assert(readBack.schema.sameType(expectedSchema))
239239

240240
// trim the URI prefix
241241
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.File
2222
import org.scalatest.BeforeAndAfterAll
2323

2424
import org.apache.spark.sql.{QueryTest, Row}
25+
import org.apache.spark.sql.hive.HiveExternalCatalog
2526
import org.apache.spark.sql.hive.test.TestHiveSingleton
2627
import org.apache.spark.sql.sources._
2728
import org.apache.spark.sql.types._
@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
150151
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
151152
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
152153
}
154+
155+
test("SPARK-18220: read Hive orc table with varchar column") {
156+
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
157+
try {
158+
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
159+
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
160+
checkAnswer(spark.table("orc_varchar"), Row("a"))
161+
} finally {
162+
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
163+
}
164+
}
153165
}
154166

155167
class OrcSourceSuite extends OrcSuite {

0 commit comments

Comments
 (0)