Skip to content

Commit 8f0511e

Browse files
hvanhovellcloud-fan
authored andcommitted
[SPARK-19650] Commands should not trigger a Spark job
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`. Added a regression test to `SQLQuerySuite`. Author: Herman van Hovell <[email protected]> Closes #17027 from hvanhovell/no-job-command.
1 parent 4cb025a commit 8f0511e

File tree

8 files changed

+39
-23
lines changed

8 files changed

+39
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
4747
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
4848
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
4949
import org.apache.spark.sql.execution._
50-
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
50+
import org.apache.spark.sql.execution.command._
5151
import org.apache.spark.sql.execution.datasources.LogicalRelation
5252
import org.apache.spark.sql.execution.python.EvaluatePython
5353
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -175,19 +175,13 @@ class Dataset[T] private[sql](
175175
}
176176

177177
@transient private[sql] val logicalPlan: LogicalPlan = {
178-
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
179-
case _: Command |
180-
_: InsertIntoTable => true
181-
case _ => false
182-
}
183-
178+
// For various commands (like DDL) and queries with side effects, we force query execution
179+
// to happen right away to let these side effects take place eagerly.
184180
queryExecution.analyzed match {
185-
// For various commands (like DDL) and queries with side effects, we force query execution
186-
// to happen right away to let these side effects take place eagerly.
187-
case p if hasSideEffects(p) =>
188-
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
189-
case Union(children) if children.forall(hasSideEffects) =>
190-
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
181+
case c: Command =>
182+
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
183+
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
184+
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
191185
case _ =>
192186
queryExecution.analyzed
193187
}

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
125125
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
126126
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
127127
command.executeCollect().map(_.getString(1))
128-
case command: ExecutedCommandExec =>
129-
command.executeCollect().map(_.getString(0))
130128
case other =>
131129
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
132130
// We need the types so we can output struct field names

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.sql.{SaveMode, Strategy}
21+
import org.apache.spark.sql.Strategy
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2424
import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
3030
import org.apache.spark.sql.execution
3131
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3232
import org.apache.spark.sql.execution.command._
33-
import org.apache.spark.sql.execution.datasources._
3433
import org.apache.spark.sql.execution.exchange.ShuffleExchange
3534
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
3635
import org.apache.spark.sql.execution.streaming._

sql/core/src/test/resources/sql-tests/results/change-column.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
196196
-- !query 19 schema
197197
struct<key:string,value:string>
198198
-- !query 19 output
199-
spark.sql.caseSensitive
199+
spark.sql.caseSensitive false
200200

201201

202202
-- !query 20
@@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
212212
-- !query 21 schema
213213
struct<key:string,value:string>
214214
-- !query 21 output
215-
spark.sql.caseSensitive
215+
spark.sql.caseSensitive true
216216

217217

218218
-- !query 22

sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
177177
-- !query 17 schema
178178
struct<key:string,value:string>
179179
-- !query 17 output
180-
spark.sql.groupByOrdinal
180+
spark.sql.groupByOrdinal false
181181

182182

183183
-- !query 18

sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
114114
-- !query 9 schema
115115
struct<key:string,value:string>
116116
-- !query 9 output
117-
spark.sql.orderByOrdinal
117+
spark.sql.orderByOrdinal false
118118

119119

120120
-- !query 10

sql/core/src/test/resources/sql-tests/results/outer-join.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
6363
-- !query 5 schema
6464
struct<key:string,value:string>
6565
-- !query 5 output
66-
spark.sql.crossJoin.enabled
66+
spark.sql.crossJoin.enabled true
6767

6868

6969
-- !query 6
@@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
8585
-- !query 7 schema
8686
struct<key:string,value:string>
8787
-- !query 7 output
88-
spark.sql.crossJoin.enabled
88+
spark.sql.crossJoin.enabled false

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package org.apache.spark.sql
2020
import java.io.File
2121
import java.math.MathContext
2222
import java.sql.Timestamp
23+
import java.util.concurrent.atomic.AtomicBoolean
2324

2425
import org.apache.spark.{AccumulatorSuite, SparkException}
26+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2527
import org.apache.spark.sql.catalyst.util.StringUtils
2628
import org.apache.spark.sql.execution.aggregate
2729
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
25642566
checkAnswer(sql(badQuery), Row(1) :: Nil)
25652567
}
25662568

2569+
test("SPARK-19650: An action on a Command should not trigger a Spark job") {
2570+
// Create a listener that checks if new jobs have started.
2571+
val jobStarted = new AtomicBoolean(false)
2572+
val listener = new SparkListener {
2573+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
2574+
jobStarted.set(true)
2575+
}
2576+
}
2577+
2578+
// Make sure no spurious job starts are pending in the listener bus.
2579+
sparkContext.listenerBus.waitUntilEmpty(500)
2580+
sparkContext.addSparkListener(listener)
2581+
try {
2582+
// Execute the command.
2583+
sql("show databases").head()
2584+
2585+
// Make sure we have seen all events triggered by DataFrame.show()
2586+
sparkContext.listenerBus.waitUntilEmpty(500)
2587+
} finally {
2588+
sparkContext.removeSparkListener(listener)
2589+
}
2590+
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
2591+
}
25672592
}

0 commit comments

Comments
 (0)