Skip to content

Commit a7a3935

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-11150][SQL] Dynamic Partition Pruning
### What changes were proposed in this pull request? This patch implements dynamic partition pruning by adding a dynamic-partition-pruning filter if there is a partitioned table and a filter on the dimension table. The filter is then planned using a heuristic approach: 1. As a broadcast relation if it is a broadcast hash join. The broadcast relation will then be transformed into a reused broadcast exchange by the `ReuseExchange` rule; or 2. As a subquery duplicate if the estimated benefit of partition table scan being saved is greater than the estimated cost of the extra scan of the duplicated subquery; otherwise 3. As a bypassed condition (`true`). ### Why are the changes needed? This is an important performance feature. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT - Testing DPP by enabling / disabling the reuse broadcast results feature and / or the subquery duplication feature. - Testing DPP with reused broadcast results. - Testing the key iterators on different HashedRelation types. - Testing the packing and unpacking of the broadcast keys in a LongType. Closes #25600 from maryannxue/dpp. Authored-by: maryannxue <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent f96486b commit a7a3935

File tree

20 files changed

+2447
-26
lines changed

20 files changed

+2447
-26
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,13 @@ public MapIterator iterator() {
414414
return new MapIterator(numValues, loc, false);
415415
}
416416

417+
/**
418+
* Returns a thread safe iterator that iterates of the entries of this map.
419+
*/
420+
public MapIterator safeIterator() {
421+
return new MapIterator(numValues, new Location(), false);
422+
}
423+
417424
/**
418425
* Returns a destructive iterator for iterating over the entries of this map. It frees each page
419426
* as it moves onto next one. Notice: it is illegal to call any method on the map after
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
24+
trait DynamicPruning extends Predicate
25+
26+
/**
27+
* The DynamicPruningSubquery expression is only used in join operations to prune one side of the
28+
* join with a filter from the other side of the join. It is inserted in cases where partition
29+
* pruning can be applied.
30+
*
31+
* @param pruningKey the filtering key of the plan to be pruned.
32+
* @param buildQuery the build side of the join.
33+
* @param buildKeys the join keys corresponding to the build side of the join
34+
* @param onlyInBroadcast when set to false it indicates that the pruning filter is likely to be
35+
* beneficial and so it should be executed even if it cannot reuse the results of the
36+
* broadcast through ReuseExchange; otherwise, it will use the filter only if it
37+
* can reuse the results of the broadcast through ReuseExchange
38+
* @param broadcastKeyIndex the index of the filtering key collected from the broadcast
39+
*/
40+
case class DynamicPruningSubquery(
41+
pruningKey: Expression,
42+
buildQuery: LogicalPlan,
43+
buildKeys: Seq[Expression],
44+
broadcastKeyIndex: Int,
45+
onlyInBroadcast: Boolean,
46+
exprId: ExprId = NamedExpression.newExprId)
47+
extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId)
48+
with DynamicPruning
49+
with Unevaluable {
50+
51+
override def children: Seq[Expression] = Seq(pruningKey)
52+
53+
override def plan: LogicalPlan = buildQuery
54+
55+
override def nullable: Boolean = false
56+
57+
override def withNewPlan(plan: LogicalPlan): DynamicPruningSubquery = copy(buildQuery = plan)
58+
59+
override lazy val resolved: Boolean = {
60+
pruningKey.resolved &&
61+
buildQuery.resolved &&
62+
buildKeys.nonEmpty &&
63+
buildKeys.forall(_.resolved) &&
64+
broadcastKeyIndex >= 0 &&
65+
broadcastKeyIndex < buildKeys.size &&
66+
buildKeys.forall(_.references.subsetOf(buildQuery.outputSet)) &&
67+
pruningKey.dataType == buildKeys(broadcastKeyIndex).dataType
68+
}
69+
70+
override def toString: String = s"dynamicpruning#${exprId.id} $conditionString"
71+
72+
override lazy val canonicalized: DynamicPruning = {
73+
copy(
74+
pruningKey = pruningKey.canonicalized,
75+
buildQuery = buildQuery.canonicalized,
76+
buildKeys = buildKeys.map(_.canonicalized),
77+
exprId = ExprId(0))
78+
}
79+
}
80+
81+
/**
82+
* Marker for a planned [[DynamicPruning]] expression.
83+
* The expression is created during planning, and it defers to its child for evaluation.
84+
*
85+
* @param child underlying predicate.
86+
*/
87+
case class DynamicPruningExpression(child: Expression)
88+
extends UnaryExpression
89+
with DynamicPruning {
90+
override def eval(input: InternalRow): Any = child.eval(input)
91+
92+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
93+
child.genCode(ctx)
94+
}
95+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import scala.collection.immutable.TreeSet
2121

2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
24+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2425
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, GenerateSafeProjection, GenerateUnsafeProjection, Predicate => BasePredicate}
2526
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
26-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, Project}
2728
import org.apache.spark.sql.catalyst.util.TypeUtils
2829
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.types._
@@ -65,6 +66,42 @@ trait PredicateHelper {
6566
}
6667
}
6768

69+
/**
70+
* Find the origin of where the input references of expression exp were scanned in the tree of
71+
* plan, and if they originate from a single leaf node.
72+
* Returns optional tuple with Expression, undoing any projections and aliasing that has been done
73+
* along the way from plan to origin, and the origin LeafNode plan from which all the exp
74+
*/
75+
def findExpressionAndTrackLineageDown(
76+
exp: Expression,
77+
plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {
78+
79+
plan match {
80+
case Project(projectList, child) =>
81+
val aliases = AttributeMap(projectList.collect {
82+
case a @ Alias(child, _) => (a.toAttribute, child)
83+
})
84+
findExpressionAndTrackLineageDown(replaceAlias(exp, aliases), child)
85+
// we can unwrap only if there are row projections, and no aggregation operation
86+
case Aggregate(_, aggregateExpressions, child) =>
87+
val aliasMap = AttributeMap(aggregateExpressions.collect {
88+
case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
89+
(a.toAttribute, a.child)
90+
})
91+
findExpressionAndTrackLineageDown(replaceAlias(exp, aliasMap), child)
92+
case l: LeafNode if exp.references.subsetOf(l.outputSet) =>
93+
Some((exp, l))
94+
case other =>
95+
other.children.flatMap {
96+
child => if (exp.references.subsetOf(child.outputSet)) {
97+
findExpressionAndTrackLineageDown(exp, child)
98+
} else {
99+
None
100+
}
101+
}.headOption
102+
}
103+
}
104+
68105
protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
69106
condition match {
70107
case Or(cond1, cond2) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
4848
}
4949

5050
override protected val blacklistedOnceBatches: Set[String] =
51-
Set("Extract Python UDFs")
51+
Set(
52+
"PartitionPruning",
53+
"Extract Python UDFs")
5254

5355
protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
5456

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,39 @@ object SQLConf {
216216
.booleanConf
217217
.createWithDefault(true)
218218

219+
val DYNAMIC_PARTITION_PRUNING_ENABLED =
220+
buildConf("spark.sql.optimizer.dynamicPartitionPruning.enabled")
221+
.doc("When true, we will generate predicate for partition column when it's used as join key")
222+
.booleanConf
223+
.createWithDefault(true)
224+
225+
val DYNAMIC_PARTITION_PRUNING_USE_STATS =
226+
buildConf("spark.sql.optimizer.dynamicPartitionPruning.useStats")
227+
.internal()
228+
.doc("When true, distinct count statistics will be used for computing the data size of the " +
229+
"partitioned table after dynamic partition pruning, in order to evaluate if it is worth " +
230+
"adding an extra subquery as the pruning filter if broadcast reuse is not applicable.")
231+
.booleanConf
232+
.createWithDefault(true)
233+
234+
val DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO = buildConf(
235+
"spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio")
236+
.internal()
237+
.doc("When statistics are not available or configured not to be used, this config will be " +
238+
"used as the fallback filter ratio for computing the data size of the partitioned table " +
239+
"after dynamic partition pruning, in order to evaluate if it is worth adding an extra " +
240+
"subquery as the pruning filter if broadcast reuse is not applicable.")
241+
.doubleConf
242+
.createWithDefault(0.5)
243+
244+
val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST =
245+
buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast")
246+
.internal()
247+
.doc("When true, dynamic partition pruning will seek to reuse the broadcast results from " +
248+
"a broadcast hash join operation.")
249+
.booleanConf
250+
.createWithDefault(true)
251+
219252
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
220253
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
221254
"column based on statistics of the data.")
@@ -1970,6 +2003,16 @@ class SQLConf extends Serializable with Logging {
19702003

19712004
def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES)
19722005

2006+
def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)
2007+
2008+
def dynamicPartitionPruningUseStats: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
2009+
2010+
def dynamicPartitionPruningFallbackFilterRatio: Double =
2011+
getConf(DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO)
2012+
2013+
def dynamicPartitionPruningReuseBroadcast: Boolean =
2014+
getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST)
2015+
19732016
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
19742017

19752018
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.dynamicpruning
19+
20+
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
21+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
22+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
23+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
26+
import org.apache.spark.sql.internal.SQLConf
27+
28+
/**
29+
* Removes the filter nodes with dynamic pruning that were not pushed down to the scan.
30+
* These nodes will not be pushed through projects and aggregates with non-deterministic
31+
* expressions.
32+
*/
33+
object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper {
34+
35+
override def apply(plan: LogicalPlan): LogicalPlan = {
36+
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
37+
return plan
38+
}
39+
40+
plan.transform {
41+
// pass through anything that is pushed down into PhysicalOperation
42+
case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
43+
// remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.
44+
case f @ Filter(condition, _) =>
45+
val newCondition = condition.transform {
46+
case _: DynamicPruning => TrueLiteral
47+
}
48+
f.copy(condition = newCondition)
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)