File tree Expand file tree Collapse file tree 2 files changed +32
-11
lines changed
main/scala/org/apache/spark/sql/execution
test/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 2 files changed +32
-11
lines changed Original file line number Diff line number Diff line change @@ -31,8 +31,10 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
3131
3232 protected def hasAlias : Boolean = aliasMap.nonEmpty
3333
34- protected def replaceAlias (attr : AttributeReference ): Option [Attribute ] = {
35- aliasMap.get(attr)
34+ protected def normalizeExpression (exp : Expression ): Expression = {
35+ exp.transform {
36+ case attr : AttributeReference => aliasMap.get(attr).getOrElse(attr)
37+ }
3638 }
3739}
3840
@@ -45,10 +47,7 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
4547 if (hasAlias) {
4648 child.outputPartitioning match {
4749 case e : Expression =>
48- val normalizedExpr = e.transform {
49- case attr : AttributeReference => replaceAlias(attr).getOrElse(attr)
50- }
51- normalizedExpr.asInstanceOf [Partitioning ]
50+ normalizeExpression(e).asInstanceOf [Partitioning ]
5251 case other => other
5352 }
5453 } else {
@@ -66,11 +65,8 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
6665
6766 final override def outputOrdering : Seq [SortOrder ] = {
6867 if (hasAlias) {
69- orderingExpressions.map { s =>
70- s.child match {
71- case a : AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a))
72- case _ => s
73- }
68+ orderingExpressions.map { sortOrder =>
69+ normalizeExpression(sortOrder).asInstanceOf [SortOrder ]
7470 }
7571 } else {
7672 orderingExpressions
Original file line number Diff line number Diff line change @@ -953,6 +953,31 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
953953 }
954954 }
955955
956+ test(" SPARK-33399: aliased should be handled properly " +
957+ " for partitioning and sortorder involving complex expressions" ) {
958+ withSQLConf(SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " -1" ) {
959+ withTempView(" t1" , " t2" , " t3" ) {
960+ spark.range(10 ).createTempView(" t1" )
961+ spark.range(20 ).createTempView(" t2" )
962+ spark.range(30 ).createTempView(" t3" )
963+ val planned = sql(
964+ """
965+ |SELECT t3.id as t3id
966+ |FROM (
967+ | SELECT t1.id as t1id
968+ | FROM t1, t2
969+ | WHERE t1.id % 10 = t2.id % 10
970+ |) t12, t3
971+ |WHERE t1id % 10 = t3.id % 10
972+ """ .stripMargin).queryExecution.executedPlan
973+ val sortNodes = planned.collect { case s : SortExec => s }
974+ assert(sortNodes.size == 3 )
975+ val exchangeNodes = planned.collect { case e : ShuffleExchangeExec => e }
976+ assert(exchangeNodes.size == 3 )
977+ }
978+ }
979+ }
980+
956981 test(" SPARK-33399: alias handling should happen properly for SinglePartition" ) {
957982 withSQLConf(SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " -1" ) {
958983 val df = spark.range(1 , 100 , 1 , 1 )
You can’t perform that action at this time.
0 commit comments