Skip to content

Commit 4e3bd55

Browse files
committed
Enable inserting array into hive table saved as parquet using datasource.
1 parent 651a1c0 commit 4e3bd55

File tree

5 files changed

+58
-5
lines changed

5 files changed

+58
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import parquet.io.api._
2828
import parquet.schema.MessageType
2929

3030
import org.apache.spark.Logging
31-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
31+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
3232
import org.apache.spark.sql.types._
3333

3434
/**
@@ -381,7 +381,14 @@ private[parquet] object RowWriteSupport {
381381
}
382382

383383
def setSchema(schema: Seq[Attribute], configuration: Configuration) {
384-
val encoded = ParquetTypesConverter.convertToString(schema)
384+
val updatedSchama = schema.map {
385+
case a if a.dataType.isInstanceOf[ArrayType] =>
386+
val newArray = ArrayType(a.dataType.asInstanceOf[ArrayType].elementType)
387+
val newAttr = AttributeReference(a.name, newArray, a.nullable, a.metadata)()
388+
newAttr
389+
case other => other
390+
}
391+
val encoded = ParquetTypesConverter.convertToString(updatedSchama)
385392
configuration.set(SPARK_ROW_SCHEMA, encoded)
386393
configuration.set(
387394
ParquetOutputFormat.WRITER_VERSION,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
263263
override protected[sql] lazy val analyzer =
264264
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
265265
override val extendedResolutionRules =
266+
catalog.PreInsertionCasts ::
266267
catalog.ParquetConversions ::
267268
catalog.CreateTables ::
268-
catalog.PreInsertionCasts ::
269269
ExtractPythonUdfs ::
270270
ResolveUdtfsAlias ::
271271
sources.PreWriteCheck(catalog) ::
@@ -344,6 +344,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
344344

345345
override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
346346
DataSourceStrategy,
347+
HiveDataSourceStrategy,
347348
HiveCommandStrategy(self),
348349
HiveDDLStrategy,
349350
DDLStrategy,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
424424
// Collects all `MetastoreRelation`s which should be replaced
425425
val toBeReplaced = plan.collect {
426426
// Write path
427-
case InsertIntoTable(relation: MetastoreRelation, _, _, _)
427+
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
428428
// Inserting into partitioned table is not supported in Parquet data source (yet).
429429
if !relation.hiveQlTable.isPartitioned &&
430430
hive.convertMetastoreParquet &&
@@ -458,6 +458,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
458458

459459
withAlias
460460
}
461+
case InsertIntoHiveTable(r: MetastoreRelation, p, c, o) if relationMap.contains(r) =>
462+
val parquetRelation = relationMap(r)
463+
InsertIntoHiveTable(parquetRelation, p, c, o)
461464
case other => other.transformExpressions {
462465
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
463466
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
3434
import org.apache.spark.sql.execution._
3535
import org.apache.spark.sql.hive.execution._
3636
import org.apache.spark.sql.parquet.ParquetRelation
37-
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing}
37+
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing, LogicalRelation, InsertIntoDataSource, InsertableRelation}
3838
import org.apache.spark.sql.types.StringType
3939

4040

@@ -254,4 +254,13 @@ private[hive] trait HiveStrategies {
254254
case _ => Nil
255255
}
256256
}
257+
258+
object HiveDataSourceStrategy extends Strategy {
259+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
260+
case i @ InsertIntoHiveTable(
261+
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
262+
ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
263+
case _ => Nil
264+
}
265+
}
257266
}

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.parquet
2020

2121
import java.io.File
2222

23+
import scala.collection.mutable.ArrayBuffer
24+
2325
import org.scalatest.BeforeAndAfterAll
2426

2527
import org.apache.spark.sql.{SQLConf, QueryTest}
@@ -299,6 +301,37 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
299301
super.afterAll()
300302
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
301303
}
304+
305+
test("insert array into parquet hive table using data source api") {
306+
val data1="""{ "timestamp": 1422435598, "data_array": [ { "field0": null, "field1": 1, "field2": 2} ] }"""
307+
val data2="""{ "timestamp": 1422435599, "data_array": [ { "field0": 0, "field1": null, "field2": 3} ] }"""
308+
309+
val json = sparkContext.makeRDD(data1 :: data2 :: Nil)
310+
val rdd = jsonRDD(json)
311+
rdd.registerTempTable("tmp_table")
312+
313+
val partitionedTableDir = File.createTempFile("persisted_table", "sparksql")
314+
partitionedTableDir.delete()
315+
partitionedTableDir.mkdir()
316+
317+
sql(
318+
s"""
319+
|create external table persisted_table
320+
|(
321+
| data_array ARRAY <STRUCT<field0: BIGINT, field1: BIGINT, field2: BIGINT>>,
322+
| timestamp BIGINT
323+
|)
324+
|STORED AS PARQUET Location '${partitionedTableDir.getCanonicalPath}'
325+
""".stripMargin)
326+
327+
sql("insert into table persisted_table select * from tmp_table").collect
328+
329+
checkAnswer(
330+
sql("select data_array.field0, data_array.field1, data_array.field2 from persisted_table"),
331+
Row(ArrayBuffer(null), ArrayBuffer(1), ArrayBuffer(2)) ::
332+
Row (ArrayBuffer(0), ArrayBuffer(null), ArrayBuffer(3)) :: Nil
333+
)
334+
}
302335
}
303336

304337
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {

0 commit comments

Comments
 (0)