Skip to content

Commit 6ab9b6f

Browse files
committed
[SPARK-21044][SPARK-21041][SQL] Add RemoveInvalidRange optimizer
**BEFORE** ``` scala> spark.range(0,0,1).explain == Physical Plan == *Range (0, 0, step=1, splits=8) ``` **AFTER** ``` scala> spark.range(0,0,1).explain == Physical Plan == LocalTableScan <empty>, [id#0L] ```
1 parent 5716354 commit 6ab9b6f

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
110110
EliminateSerialization,
111111
RemoveRedundantAliases,
112112
RemoveRedundantProject,
113+
RemoveInvalidRange,
113114
SimplifyCreateStructOps,
114115
SimplifyCreateArrayOps,
115116
SimplifyCreateMapOps,
@@ -270,6 +271,16 @@ object RemoveRedundantProject extends Rule[LogicalPlan] {
270271
}
271272
}
272273

274+
/**
275+
* Replace invalid `range` with emtpy range.
276+
*/
277+
object RemoveInvalidRange extends Rule[LogicalPlan] {
278+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
279+
case p @ Range(start, end, step, a, b) if (start == end) || (start < end ^ 0 < step) =>
280+
LocalRelation(p.output, data = Seq.empty)
281+
}
282+
}
283+
273284
/**
274285
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
275286
*/
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.plans._
21+
import org.apache.spark.sql.catalyst.plans._
22+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
23+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
24+
25+
class RemoveInvalidRangeSuite extends PlanTest {
26+
object Optimize extends RuleExecutor[LogicalPlan] {
27+
val batches =
28+
Batch("RemoveInvalidRange", Once,
29+
RemoveInvalidRange) :: Nil
30+
}
31+
32+
test("preserve valid ranges") {
33+
Seq(Range(0, 10, 1, 1), Range(10, 0, -1, 1)).foreach { query =>
34+
val optimized = Optimize.execute(query.analyze)
35+
val correctAnswer = query
36+
37+
comparePlans(optimized, correctAnswer)
38+
}
39+
}
40+
41+
test("remove ranges with invalid combination of start/end/step") {
42+
Seq(Range(0, 0, 1, 1), Range(0, 0, -1, 1), Range(1, 10, -1, 1), Range(10, 1, 1, 1)).foreach {
43+
query =>
44+
val optimized = Optimize.execute(query.analyze)
45+
val correctAnswer = LocalRelation(query.output, data = Seq.empty)
46+
comparePlans(optimized, correctAnswer)
47+
}
48+
}
49+
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
191191
checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
192192
}
193193
}
194+
195+
test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") {
196+
val start = java.lang.Long.MAX_VALUE - 3
197+
val end = java.lang.Long.MIN_VALUE + 2
198+
Seq("false", "true").foreach { value =>
199+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) {
200+
assert(spark.sparkContext.range(start, end, 1).collect.length == 0)
201+
assert(spark.range(start, end, 1).collect.length == 0)
202+
}
203+
}
204+
}
194205
}
195206

196207
object DataFrameRangeSuite {

0 commit comments

Comments
 (0)