Skip to content

Commit 946a0d9

Browse files
committed
Resolve type conflicts between strings and timestamps in partition column
1 parent f180b65 commit 946a0d9

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ object PartitioningUtils {
139139
"root directory of the table. If there are multiple root directories, " +
140140
"please load them separately and then union them.")
141141

142-
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
142+
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
143143

144144
// Creates the StructType which represents the partition columns.
145145
val fields = {
@@ -318,7 +318,8 @@ object PartitioningUtils {
318318
* }}}
319319
*/
320320
def resolvePartitions(
321-
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
321+
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
322+
timeZone: TimeZone): Seq[PartitionValues] = {
322323
if (pathsWithPartitionValues.isEmpty) {
323324
Seq.empty
324325
} else {
@@ -333,7 +334,7 @@ object PartitioningUtils {
333334
val values = pathsWithPartitionValues.map(_._2)
334335
val columnCount = values.head.columnNames.size
335336
val resolvedValues = (0 until columnCount).map { i =>
336-
resolveTypeConflicts(values.map(_.literals(i)))
337+
resolveTypeConflicts(values.map(_.literals(i)), timeZone)
337338
}
338339

339340
// Fills resolved literals back to each partition
@@ -470,15 +471,15 @@ object PartitioningUtils {
470471
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
471472
* types.
472473
*/
473-
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
474+
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
474475
val desiredType = {
475476
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
476477
// Falls back to string if all values of this column are null or empty string
477478
if (topType == NullType) StringType else topType
478479
}
479480

480481
literals.map { case l @ Literal(_, dataType) =>
481-
Literal.create(Cast(l, desiredType).eval(), desiredType)
482+
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
482483
}
483484
}
484485
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
10551055
}
10561056
}
10571057
}
1058+
1059+
test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
1060+
val df = Seq(
1061+
(1, "2015-01-01 00:00:00"),
1062+
(2, "2014-01-01 00:00:00"),
1063+
(3, "blah")).toDF("i", "str")
1064+
1065+
withTempPath { path =>
1066+
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
1067+
checkAnswer(spark.read.load(path.getAbsolutePath), df)
1068+
}
1069+
}
10581070
}

0 commit comments

Comments
 (0)