Skip to content

Commit 701f1c3

Browse files
committed
resolve the comments
1 parent 4ccd4b8 commit 701f1c3

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,10 @@ case class AdaptiveSparkPlanExec(
9191
DisableUnnecessaryBucketedScan
9292
) ++ context.session.sessionState.queryStagePrepRules
9393

94-
@transient private val initialPlan = context.session.withActive {
95-
applyPhysicalRules(
96-
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
97-
}
98-
9994
// A list of physical optimizer rules to be applied to a new stage before its execution. These
10095
// optimizations should be stage-independent.
10196
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
102-
PlanAdaptiveDynamicPruningFilters(initialPlan),
97+
PlanAdaptiveDynamicPruningFilters(this),
10398
ReuseAdaptiveSubquery(context.subqueryCache),
10499
CoalesceShufflePartitions(context.session),
105100
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
@@ -134,6 +129,11 @@ case class AdaptiveSparkPlanExec(
134129

135130
@transient private val costEvaluator = SimpleCostEvaluator
136131

132+
@transient private val initialPlan = context.session.withActive {
133+
applyPhysicalRules(
134+
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
135+
}
136+
137137
@volatile private var currentPhysicalPlan = initialPlan
138138

139139
private var isFinalPlan = false

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
2727
/**
2828
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
2929
*/
30-
case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] {
30+
case class PlanAdaptiveDynamicPruningFilters(
31+
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
3132
def apply(plan: SparkPlan): SparkPlan = {
3233
if (!conf.dynamicPartitionPruningEnabled) {
3334
return plan
@@ -44,7 +45,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S
4445
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
4546

4647
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
47-
rootPlan.find {
48+
find(rootPlan) {
4849
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
4950
left.sameResult(exchange)
5051
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,12 +1492,11 @@ abstract class DynamicPartitionPruningSuiteBase
14921492
// +- Exchange
14931493
// +- HashAggregate
14941494
// +- Filter
1495-
// +- FileScan [PartitionFilters: [isnotnull(store_id#3367),
1496-
// dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]]
1495+
// +- FileScan [PartitionFilters: dynamicpruning#3385]
14971496
// +- SubqueryBroadcast dynamicpruning#3385
1498-
// +- AdaptiveSparkPlan
1499-
// +- BroadcastQueryStage
1500-
// +- BroadcastExchange
1497+
// +- AdaptiveSparkPlan
1498+
// +- BroadcastQueryStage
1499+
// +- BroadcastExchange
15011500
//
15021501
// +- BroadcastQueryStage
15031502
// +- ReusedExchange

0 commit comments

Comments
 (0)