Skip to content

Commit 71c9dea

Browse files
committed
read Hive orc table with varchar column
1 parent d449988 commit 71c9dea

File tree

4 files changed

+34
-5
lines changed

4 files changed

+34
-5
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ private[spark] object HiveUtils extends Logging {
5151
sc
5252
}
5353

54-
/** The version of hive used internally by Spark SQL. */
54+
// The version of hive used internally by Spark SQL.
5555
val hiveExecutionVersion: String = "1.2.1"
5656

57+
// The property key that is used to store the raw hive type string in the metadata of StructField.
58+
val hiveTypeString: String = "HIVE_TYPE_STRING"
59+
5760
val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
5861
.doc("Version of the Hive metastore. Available options are " +
5962
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")

sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
6161
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
6262

6363
private def toHiveColumn(c: StructField): FieldSchema = {
64-
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
64+
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
65+
c.metadata.getString(HiveUtils.hiveTypeString)
66+
} else {
67+
c.dataType.catalogString
68+
}
69+
new FieldSchema(c.name, typeString, c.getComment.orNull)
6570
}
6671

6772
// TODO: merge this with HiveClientImpl#toHiveTable

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
4646
import org.apache.spark.sql.catalyst.expressions.Expression
4747
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
4848
import org.apache.spark.sql.execution.QueryExecutionException
49-
import org.apache.spark.sql.types.{StructField, StructType}
49+
import org.apache.spark.sql.hive.HiveUtils
50+
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
5051
import org.apache.spark.util.{CircularBuffer, Utils}
5152

5253
/**
@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
748749
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
749750

750751
private def toHiveColumn(c: StructField): FieldSchema = {
751-
new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
752+
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
753+
c.metadata.getString(HiveUtils.hiveTypeString)
754+
} else {
755+
c.dataType.catalogString
756+
}
757+
new FieldSchema(c.name, typeString, c.getComment().orNull)
752758
}
753759

754760
private def fromHiveColumn(hc: FieldSchema): StructField = {
@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
758764
case e: ParseException =>
759765
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
760766
}
767+
768+
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
761769
val field = StructField(
762770
name = hc.getName,
763771
dataType = columnType,
764-
nullable = true)
772+
nullable = true,
773+
metadata = metadata)
765774
Option(hc.getComment).map(field.withComment).getOrElse(field)
766775
}
767776

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.File
2222
import org.scalatest.BeforeAndAfterAll
2323

2424
import org.apache.spark.sql.{QueryTest, Row}
25+
import org.apache.spark.sql.hive.HiveExternalCatalog
2526
import org.apache.spark.sql.hive.test.TestHiveSingleton
2627
import org.apache.spark.sql.sources._
2728
import org.apache.spark.sql.types._
@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
150151
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
151152
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
152153
}
154+
155+
test("SPARK-18220: read Hive orc table with varchar column") {
156+
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
157+
try {
158+
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
159+
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
160+
checkAnswer(spark.table("orc_varchar"), Row("a"))
161+
} finally {
162+
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
163+
}
164+
}
153165
}
154166

155167
class OrcSourceSuite extends OrcSuite {

0 commit comments

Comments
 (0)