Skip to content

Commit a422a7f

Browse files
committed
[SPARK-24012][SQL] Union of map and other compatible column
1 parent cce4694 commit a422a7f

File tree

4 files changed

+60
-16
lines changed

4 files changed

+60
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,11 @@ object TypeCoercion {
267267
case s: Union if s.childrenResolved &&
268268
s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved =>
269269
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children)
270-
s.makeCopy(Array(newChildren))
270+
if(newChildren != s.children) {
271+
s.makeCopy(Array(newChildren))
272+
} else {
273+
s
274+
}
271275
}
272276

273277
/** Build new children with the widest types for each attribute among all the children */
@@ -279,7 +283,7 @@ object TypeCoercion {
279283
val targetTypes: Seq[DataType] =
280284
getWidestTypes(children, attrIndex = 0, mutable.Queue[DataType]())
281285

282-
if (targetTypes.nonEmpty) {
286+
if (!targetTypes.forall(null == _)) {
283287
// Add an extra Project if the targetTypes are different from the original types.
284288
children.map(widenTypes(_, targetTypes))
285289
} else {
@@ -296,24 +300,35 @@ object TypeCoercion {
296300
// Return the result after the widen data types have been found for all the children
297301
if (attrIndex >= children.head.output.length) return castedTypes.toSeq
298302

303+
val types = children.map(_.output(attrIndex).dataType)
299304
// For the attrIndex-th attribute, find the widest type
300-
findWiderCommonType(children.map(_.output(attrIndex).dataType)) match {
305+
findWiderCommonType(types) match {
301306
// If unable to find an appropriate widen type for this column, return an empty Seq
302-
case None => Seq.empty[DataType]
307+
case None =>
308+
castedTypes.enqueue(null)
303309
// Otherwise, record the result in the queue and find the type for the next column
304-
case Some(widenType) =>
310+
case Some(widenType) if types.exists(_ != widenType) =>
305311
castedTypes.enqueue(widenType)
306-
getWidestTypes(children, attrIndex + 1, castedTypes)
312+
case _ =>
313+
castedTypes.enqueue(null)
307314
}
315+
getWidestTypes(children, attrIndex + 1, castedTypes)
308316
}
309317

310318
/** Given a plan, add an extra project on top to widen some columns' data types. */
311319
private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = {
320+
var changed = false
312321
val casted = plan.output.zip(targetTypes).map {
313-
case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)()
322+
case (e, dt) if null != dt && e.dataType != dt =>
323+
changed = true
324+
Alias(Cast(e, dt), e.name)()
314325
case (e, _) => e
315326
}
316-
Project(casted, plan)
327+
if(changed) {
328+
Project(casted, plan)
329+
} else {
330+
plan
331+
}
317332
}
318333
}
319334

sql/core/src/test/resources/sql-tests/inputs/union.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ FROM (SELECT col AS col
3535
SELECT col
3636
FROM p3) T1) T2;
3737

38+
-- SPARK-24012 Union of map and other compatible columns.
39+
SELECT map(1, 2), 'str'
40+
UNION ALL
41+
SELECT map(1, 2, 3, NULL), 1;
42+
3843
-- Clean-up
3944
DROP VIEW IF EXISTS t1;
4045
DROP VIEW IF EXISTS t2;

sql/core/src/test/resources/sql-tests/results/union.sql.out

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 14
2+
-- Number of queries: 15
33

44

55
-- !query 0
@@ -105,40 +105,51 @@ struct<x:int,col:int>
105105

106106

107107
-- !query 9
108-
DROP VIEW IF EXISTS t1
108+
SELECT map(1, 2), 'str'
109+
UNION ALL
110+
SELECT map(1, 2, 3, NULL), 1
109111
-- !query 9 schema
110-
struct<>
112+
struct<map(1, 2):map<int,int>,str:string>
111113
-- !query 9 output
112-
114+
{1:2,3:null} 1
115+
{1:2} str
113116

114117

115118
-- !query 10
116-
DROP VIEW IF EXISTS t2
119+
DROP VIEW IF EXISTS t1
117120
-- !query 10 schema
118121
struct<>
119122
-- !query 10 output
120123

121124

122125

123126
-- !query 11
124-
DROP VIEW IF EXISTS p1
127+
DROP VIEW IF EXISTS t2
125128
-- !query 11 schema
126129
struct<>
127130
-- !query 11 output
128131

129132

130133

131134
-- !query 12
132-
DROP VIEW IF EXISTS p2
135+
DROP VIEW IF EXISTS p1
133136
-- !query 12 schema
134137
struct<>
135138
-- !query 12 output
136139

137140

138141

139142
-- !query 13
140-
DROP VIEW IF EXISTS p3
143+
DROP VIEW IF EXISTS p2
141144
-- !query 13 schema
142145
struct<>
143146
-- !query 13 output
144147

148+
149+
150+
-- !query 14
151+
DROP VIEW IF EXISTS p3
152+
-- !query 14 schema
153+
struct<>
154+
-- !query 14 output
155+

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
896896
}
897897
}
898898

899+
test("SPARK-24012 Union of map and other compatible columns") {
900+
checkAnswer(
901+
sql(
902+
"""
903+
|SELECT map(1, 2), 'str'
904+
|UNION ALL
905+
|SELECT map(1, 2, 3, NULL), 1""".stripMargin),
906+
Row.fromSeq(Seq(Map(1 -> 2), "str"))::
907+
Row.fromSeq(Seq(Map(1 -> 2, 3 -> null), "1"))::
908+
Nil
909+
)
910+
}
911+
899912
test("EXCEPT") {
900913
checkAnswer(
901914
sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData"),

0 commit comments

Comments
 (0)