Skip to content

Commit 773754b

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-20356][SQL] Pruned InMemoryTableScanExec should have correct output partitioning and ordering
## What changes were proposed in this pull request? The output of `InMemoryTableScanExec` can be pruned and mismatch with `InMemoryRelation` and its child plan's output. This causes wrong output partitioning and ordering. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #17679 from viirya/SPARK-20356.
1 parent 608bf30 commit 773754b

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ case class InMemoryTableScanExec(
4242
override def output: Seq[Attribute] = attributes
4343

4444
private def updateAttribute(expr: Expression): Expression = {
45-
val attrMap = AttributeMap(relation.child.output.zip(output))
45+
// attributes can be pruned so using relation's output.
46+
// E.g., relation.output is [id, item] but this scan's output can be [item] only.
47+
val attrMap = AttributeMap(relation.child.output.zip(relation.output))
4648
expr.transform {
4749
case attr: Attribute => attrMap.getOrElse(attr, attr)
4850
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,4 +414,19 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
414414
assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet))
415415
}
416416
}
417+
418+
test("SPARK-20356: pruned InMemoryTableScanExec should have correct ordering and partitioning") {
419+
withSQLConf("spark.sql.shuffle.partitions" -> "200") {
420+
val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group")
421+
val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id")
422+
val df3 = df1.join(df2, Seq("item")).select($"id", $"group".as("item")).distinct()
423+
424+
df3.unpersist()
425+
val agg_without_cache = df3.groupBy($"item").count()
426+
427+
df3.cache()
428+
val agg_with_cache = df3.groupBy($"item").count()
429+
checkAnswer(agg_without_cache, agg_with_cache)
430+
}
431+
}
417432
}

0 commit comments

Comments
 (0)