Skip to content

Commit 371a067

Browse files
committed
Support for conversion from compatible schema for Parquet data source when data types are not matched
1 parent 1fad559 commit 371a067

File tree

3 files changed

+142
-0
lines changed

3 files changed

+142
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ private[parquet] class ParquetRowConverter(
214214
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
215215

216216
catalystType match {
217+
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
218+
if ParquetSchemaCompatibility.isCompatible(catalystType, parquetType) =>
219+
ParquetSchemaCompatibility.newCompatibleConverter(parquetType, catalystType, updater)
220+
217221
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
218222
new ParquetPrimitiveConverter(updater)
219223

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import org.apache.parquet.io.api.Converter
21+
import org.apache.parquet.schema.Type
22+
23+
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
24+
import org.apache.spark.sql.types._
25+
26+
private[parquet] object ParquetSchemaCompatibility {
27+
28+
private val schemaConverter = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
29+
30+
// The logic for setting and adding a value in `ParquetPrimitiveConverter` are separated
31+
// into `NumericValueUpdater` and `NumericCompatibleConverter` so that value can be converted
32+
// to a desired type.
33+
// `NumericValueUpdater` updates the input `Number` via `ParentContainerUpdater`. This
34+
// is for updating a value converted for the appropriate value type for `ParentContainerUpdater`
35+
private type NumericValueUpdater = Number => Unit
36+
37+
// This is a wrapper for `NumericValueUpdater`. this returns a converter that adds the value
38+
// from `NumericValueUpdater`.
39+
private type NumericCompatibleConverter = NumericValueUpdater => ParquetPrimitiveConverter
40+
41+
private def createCompatiblePrimitiveConverter(
42+
parquetType: Type,
43+
catalystType: DataType,
44+
updater: ParentContainerUpdater): NumericCompatibleConverter = {
45+
46+
val catalystTypeFromParquet = schemaConverter.convertField(parquetType)
47+
48+
catalystTypeFromParquet match {
49+
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType =>
50+
(valueUpdater: NumericValueUpdater) =>
51+
new ParquetPrimitiveConverter(updater) {
52+
override def addInt(value: Int): Unit = valueUpdater(value)
53+
override def addLong(value: Long): Unit = valueUpdater(value)
54+
override def addFloat(value: Float): Unit = valueUpdater(value)
55+
override def addDouble(value: Double): Unit = valueUpdater(value)
56+
}
57+
58+
case _ =>
59+
throw new RuntimeException(
60+
s"Unable to create Parquet converter for data type $catalystType " +
61+
s"whose Parquet type is $parquetType. They are not compatible.")
62+
}
63+
}
64+
65+
def isCompatible(catalystType: DataType, parquetType: Type): Boolean = {
66+
// Find a compatible type between both numeric types.
67+
val catalystTypeFromParquet = schemaConverter.convertField(parquetType)
68+
val compatibleCatalystType =
69+
TypeCoercion.findTightestCommonTypeOfTwo(catalystType, catalystTypeFromParquet).orNull
70+
catalystType == compatibleCatalystType
71+
}
72+
73+
def newCompatibleConverter(
74+
parquetType: Type,
75+
catalystType: DataType,
76+
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
77+
78+
val newCompatiblePrimitiveConverter =
79+
createCompatiblePrimitiveConverter(parquetType, catalystType, updater)
80+
81+
catalystType match {
82+
case ByteType =>
83+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setByte(v.byteValue())
84+
newCompatiblePrimitiveConverter(valueUpdater)
85+
86+
case ShortType =>
87+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setShort(v.shortValue())
88+
newCompatiblePrimitiveConverter(valueUpdater)
89+
90+
case IntegerType =>
91+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setInt(v.intValue())
92+
newCompatiblePrimitiveConverter(valueUpdater)
93+
94+
case LongType =>
95+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setLong(v.longValue())
96+
newCompatiblePrimitiveConverter(valueUpdater)
97+
98+
case FloatType =>
99+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setFloat(v.floatValue())
100+
newCompatiblePrimitiveConverter(valueUpdater)
101+
102+
case DoubleType =>
103+
val valueUpdater: NumericValueUpdater = (v: Number) => updater.setDouble(v.doubleValue())
104+
newCompatiblePrimitiveConverter(valueUpdater)
105+
106+
case _ =>
107+
throw new RuntimeException(
108+
s"Unable to create Parquet converter for data type $catalystType " +
109+
s"whose Parquet type is $parquetType. They are not compatible.")
110+
}
111+
}
112+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
736736
}
737737
}
738738
}
739+
740+
test("SPARK-16544 Support Parquet schema compatibility with numeric types") {
741+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
742+
withTempPath { file =>
743+
val data = (1 to 4).map(i => (i.toByte, i.toShort, i, i.toLong, i.toFloat))
744+
745+
spark.createDataFrame(data).toDF("a", "b", "c", "d", "e")
746+
.write.parquet(file.getCanonicalPath)
747+
748+
val schema = StructType(
749+
StructField("a", ShortType, true) ::
750+
StructField("b", IntegerType, true) ::
751+
StructField("c", LongType, true) ::
752+
StructField("d", FloatType, true) ::
753+
StructField("e", DoubleType, true) :: Nil)
754+
755+
val df = spark.read.schema(schema).parquet(file.getAbsolutePath)
756+
757+
val expectedDf = data.map { case (a, b, c, d, e) =>
758+
(a.toShort, b.toInt, c.toLong, d.toFloat, e.toDouble)
759+
}.toDF("a", "b", "c", "d", "e")
760+
761+
checkAnswer(df, expectedDf)
762+
}
763+
}
764+
}
739765
}
740766

741767
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

0 commit comments

Comments
 (0)