Skip to content

Commit 91148f4

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-28481][SQL] More expressions should extend NullIntolerant
### What changes were proposed in this pull request? 1. Make more expressions extend `NullIntolerant`. 2. Add a checker(in `ExpressionInfoSuite`) to identify whether the expression is `NullIntolerant`. ### Why are the changes needed? Avoid skew join if the join column has many null values and can improve query performance. For examples: ```sql CREATE TABLE t1(c1 string, c2 string) USING parquet; CREATE TABLE t2(c1 string, c2 string) USING parquet; EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON upper(t1.c1) = upper(t2.c1); ``` Before and after this PR: ```sql == Physical Plan == *(2) Project [c1#5, c2#6] +- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#41] : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[c1#5,c2#6] +- *(2) ColumnarToRow +- FileScan parquet default.t2[c1#7] == Physical Plan == *(2) Project [c1#5, c2#6] +- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildRight :- *(2) Project [c1#5, c2#6] : +- *(2) Filter isnotnull(c1#5) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[c1#5,c2#6] +- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#59] +- *(1) Project [c1#7] +- *(1) Filter isnotnull(c1#7) +- *(1) ColumnarToRow +- FileScan parquet default.t2[c1#7] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #28626 from wangyum/SPARK-28481. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 37a1fb8 commit 91148f4

File tree

15 files changed

+180
-103
lines changed

15 files changed

+180
-103
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ object TimeWindow {
144144
case class PreciseTimestampConversion(
145145
child: Expression,
146146
fromType: DataType,
147-
toType: DataType) extends UnaryExpression with ExpectsInputTypes {
147+
toType: DataType) extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
148148
override def inputTypes: Seq[AbstractDataType] = Seq(fromType)
149149
override def dataType: DataType = toType
150150
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwiseExpressions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ case class BitwiseXor(left: Expression, right: Expression) extends BinaryArithme
127127
> SELECT _FUNC_ 0;
128128
-1
129129
""")
130-
case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInputTypes {
130+
case class BitwiseNot(child: Expression)
131+
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
131132

132133
override def inputTypes: Seq[AbstractDataType] = Seq(IntegralType)
133134

@@ -164,7 +165,8 @@ case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInp
164165
0
165166
""",
166167
since = "3.0.0")
167-
case class BitwiseCount(child: Expression) extends UnaryExpression with ExpectsInputTypes {
168+
case class BitwiseCount(child: Expression)
169+
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
168170

169171
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(IntegralType, BooleanType))
170172

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ object Size {
141141
""",
142142
group = "map_funcs")
143143
case class MapKeys(child: Expression)
144-
extends UnaryExpression with ExpectsInputTypes {
144+
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
145145

146146
override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
147147

@@ -332,7 +332,7 @@ case class ArraysZip(children: Seq[Expression]) extends Expression with ExpectsI
332332
""",
333333
group = "map_funcs")
334334
case class MapValues(child: Expression)
335-
extends UnaryExpression with ExpectsInputTypes {
335+
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
336336

337337
override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
338338

@@ -361,7 +361,8 @@ case class MapValues(child: Expression)
361361
""",
362362
group = "map_funcs",
363363
since = "3.0.0")
364-
case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInputTypes {
364+
case class MapEntries(child: Expression)
365+
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
365366

366367
override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
367368

@@ -649,7 +650,7 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
649650
""",
650651
group = "map_funcs",
651652
since = "2.4.0")
652-
case class MapFromEntries(child: Expression) extends UnaryExpression {
653+
case class MapFromEntries(child: Expression) extends UnaryExpression with NullIntolerant {
653654

654655
@transient
655656
private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = child.dataType match {
@@ -873,7 +874,7 @@ object ArraySortLike {
873874
group = "array_funcs")
874875
// scalastyle:on line.size.limit
875876
case class SortArray(base: Expression, ascendingOrder: Expression)
876-
extends BinaryExpression with ArraySortLike {
877+
extends BinaryExpression with ArraySortLike with NullIntolerant {
877878

878879
def this(e: Expression) = this(e, Literal(true))
879880

@@ -1017,7 +1018,8 @@ case class Shuffle(child: Expression, randomSeed: Option[Long] = None)
10171018
Reverse logic for arrays is available since 2.4.0.
10181019
"""
10191020
)
1020-
case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
1021+
case class Reverse(child: Expression)
1022+
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
10211023

10221024
// Input types are utilized by type coercion in ImplicitTypeCasts.
10231025
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(StringType, ArrayType))
@@ -1086,7 +1088,7 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI
10861088
""",
10871089
group = "array_funcs")
10881090
case class ArrayContains(left: Expression, right: Expression)
1089-
extends BinaryExpression with ImplicitCastInputTypes {
1091+
extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
10901092

10911093
override def dataType: DataType = BooleanType
10921094

@@ -1185,7 +1187,7 @@ case class ArrayContains(left: Expression, right: Expression)
11851187
since = "2.4.0")
11861188
// scalastyle:off line.size.limit
11871189
case class ArraysOverlap(left: Expression, right: Expression)
1188-
extends BinaryArrayExpressionWithImplicitCast {
1190+
extends BinaryArrayExpressionWithImplicitCast with NullIntolerant {
11891191

11901192
override def checkInputDataTypes(): TypeCheckResult = super.checkInputDataTypes() match {
11911193
case TypeCheckResult.TypeCheckSuccess =>
@@ -1410,7 +1412,7 @@ case class ArraysOverlap(left: Expression, right: Expression)
14101412
since = "2.4.0")
14111413
// scalastyle:on line.size.limit
14121414
case class Slice(x: Expression, start: Expression, length: Expression)
1413-
extends TernaryExpression with ImplicitCastInputTypes {
1415+
extends TernaryExpression with ImplicitCastInputTypes with NullIntolerant {
14141416

14151417
override def dataType: DataType = x.dataType
14161418

@@ -1688,7 +1690,8 @@ case class ArrayJoin(
16881690
""",
16891691
group = "array_funcs",
16901692
since = "2.4.0")
1691-
case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
1693+
case class ArrayMin(child: Expression)
1694+
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
16921695

16931696
override def nullable: Boolean = true
16941697

@@ -1755,7 +1758,8 @@ case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCast
17551758
""",
17561759
group = "array_funcs",
17571760
since = "2.4.0")
1758-
case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
1761+
case class ArrayMax(child: Expression)
1762+
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
17591763

17601764
override def nullable: Boolean = true
17611765

@@ -1831,7 +1835,7 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast
18311835
group = "array_funcs",
18321836
since = "2.4.0")
18331837
case class ArrayPosition(left: Expression, right: Expression)
1834-
extends BinaryExpression with ImplicitCastInputTypes {
1838+
extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
18351839

18361840
@transient private lazy val ordering: Ordering[Any] =
18371841
TypeUtils.getInterpretedOrdering(right.dataType)
@@ -1909,7 +1913,7 @@ case class ArrayPosition(left: Expression, right: Expression)
19091913
""",
19101914
since = "2.4.0")
19111915
case class ElementAt(left: Expression, right: Expression)
1912-
extends GetMapValueUtil with GetArrayItemUtil {
1916+
extends GetMapValueUtil with GetArrayItemUtil with NullIntolerant {
19131917

19141918
@transient private lazy val mapKeyType = left.dataType.asInstanceOf[MapType].keyType
19151919

@@ -2245,7 +2249,7 @@ case class Concat(children: Seq[Expression]) extends ComplexTypeMergingExpressio
22452249
""",
22462250
group = "array_funcs",
22472251
since = "2.4.0")
2248-
case class Flatten(child: Expression) extends UnaryExpression {
2252+
case class Flatten(child: Expression) extends UnaryExpression with NullIntolerant {
22492253

22502254
private def childDataType: ArrayType = child.dataType.asInstanceOf[ArrayType]
22512255

@@ -2884,7 +2888,7 @@ case class ArrayRepeat(left: Expression, right: Expression)
28842888
group = "array_funcs",
28852889
since = "2.4.0")
28862890
case class ArrayRemove(left: Expression, right: Expression)
2887-
extends BinaryExpression with ImplicitCastInputTypes {
2891+
extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
28882892

28892893
override def dataType: DataType = left.dataType
28902894

@@ -3081,7 +3085,7 @@ trait ArraySetLike {
30813085
group = "array_funcs",
30823086
since = "2.4.0")
30833087
case class ArrayDistinct(child: Expression)
3084-
extends UnaryExpression with ArraySetLike with ExpectsInputTypes {
3088+
extends UnaryExpression with ArraySetLike with ExpectsInputTypes with NullIntolerant {
30853089

30863090
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
30873091

@@ -3219,7 +3223,8 @@ case class ArrayDistinct(child: Expression)
32193223
/**
32203224
* Will become common base class for [[ArrayUnion]], [[ArrayIntersect]], and [[ArrayExcept]].
32213225
*/
3222-
trait ArrayBinaryLike extends BinaryArrayExpressionWithImplicitCast with ArraySetLike {
3226+
trait ArrayBinaryLike
3227+
extends BinaryArrayExpressionWithImplicitCast with ArraySetLike with NullIntolerant {
32233228
override protected def dt: DataType = dataType
32243229
override protected def et: DataType = elementType
32253230

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ object CreateMap {
255255
{1.0:"2",3.0:"4"}
256256
""", since = "2.4.0")
257257
case class MapFromArrays(left: Expression, right: Expression)
258-
extends BinaryExpression with ExpectsInputTypes {
258+
extends BinaryExpression with ExpectsInputTypes with NullIntolerant {
259259

260260
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType)
261261

@@ -476,7 +476,7 @@ case class CreateNamedStruct(children: Seq[Expression]) extends Expression {
476476
since = "2.0.1")
477477
// scalastyle:on line.size.limit
478478
case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: Expression)
479-
extends TernaryExpression with ExpectsInputTypes {
479+
extends TernaryExpression with ExpectsInputTypes with NullIntolerant {
480480

481481
def this(child: Expression, pairDelim: Expression) = {
482482
this(child, pairDelim, Literal(":"))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ case class StructsToCsv(
211211
options: Map[String, String],
212212
child: Expression,
213213
timeZoneId: Option[String] = None)
214-
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
214+
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
215+
with NullIntolerant {
215216
override def nullable: Boolean = true
216217

217218
def this(options: Map[String, String], child: Expression) = this(options, child, None)

0 commit comments

Comments
 (0)