Skip to content

Commit 751ded0

Browse files
author
Salil Surendran
committed
[SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods
QueryExecutionListener has two methods onSuccess() and onFailure() that takes a QueryExecution object as a parameter that gets called when a query is executed. It gets called for several of the DataSet methods like take, head, first, collect etc. but doesn't get called for any of the DataFrameWriter methods like saveAsTable, save etc. This commit fixes this issue and makes calls to these two methods from DataFrameWriter output methods. Also, added a new property "spark.sql.queryExecutionListeners" that can be used to specify instances of QueryExecutionListeners that should be attached to the SparkSession when the spark application starts up. Testing was done using unit tests.
1 parent 640f942 commit 751ded0

File tree

8 files changed

+423
-30
lines changed

8 files changed

+423
-30
lines changed

docs/sql-programming-guide.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,8 +1256,9 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp
12561256

12571257
## Other Configuration Options
12581258

1259-
The following options can also be used to tune the performance of query execution. It is possible
1260-
that these options will be deprecated in future release as more optimizations are performed automatically.
1259+
The following options can also be used to tune the performance of query execution and attaching
1260+
query execution listeners. It is possible that these options will be deprecated in future release as
1261+
more optimizations are performed automatically.
12611262

12621263
<table class="table">
12631264
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -1304,6 +1305,16 @@ that these options will be deprecated in future release as more optimizations ar
13041305
Configures the number of partitions to use when shuffling data for joins or aggregations.
13051306
</td>
13061307
</tr>
1308+
<tr>
1309+
<td><code>spark.sql.queryExecutionListeners</code></td>
1310+
<td></td>
1311+
<td>
1312+
A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession,
1313+
instances of these listeners will be added to it. These classes needs to have a zero-argument
1314+
constructor. If the specified class can't be found or the class specified doesn't have a valid
1315+
constructor the SparkSession creation will fail with an exception.
1316+
</td>
1317+
</tr>
13071318
</table>
13081319

13091320
# Distributed SQL Engine

project/MimaExcludes.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ object MimaExcludes {
128128
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
129129
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
130130
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
131-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
131+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"),
132+
133+
// [SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods
134+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"),
135+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
136+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"),
137+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
132138
)
133139
}
134140

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
2727
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
2828
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
29+
import org.apache.spark.sql.execution.QueryExecution
2930
import org.apache.spark.sql.execution.command.DDLUtils
3031
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
3132
import org.apache.spark.sql.sources.BaseRelation
3233
import org.apache.spark.sql.types.StructType
34+
import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener}
3335

3436
/**
3537
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -189,6 +191,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
189191
this
190192
}
191193

194+
/**
195+
* Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener}
196+
* methods.
197+
*
198+
* @param funcName A identifier for the method executing the query
199+
* @param qe the @see [[QueryExecution]] object associated with the
200+
* query
201+
* @param outputParams The output parameters useful for query analysis
202+
* @param action the function that executes the query after which the listener methods gets
203+
* called.
204+
*/
205+
private def executeAndCallQEListener(
206+
funcName: String,
207+
qe: QueryExecution,
208+
outputParams: OutputParams)(action: => Unit) = {
209+
try {
210+
val start = System.nanoTime()
211+
action
212+
val end = System.nanoTime()
213+
df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some(outputParams))
214+
} catch {
215+
case e: Exception =>
216+
df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some(outputParams))
217+
throw e
218+
}
219+
}
220+
192221
/**
193222
* Saves the content of the `DataFrame` at the specified path.
194223
*
@@ -218,7 +247,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
218247
bucketSpec = getBucketSpec,
219248
options = extraOptions.toMap)
220249

221-
dataSource.write(mode, df)
250+
val destination = source match {
251+
case "jdbc" => extraOptions.get("dbtable")
252+
case _ => extraOptions.get("path")
253+
}
254+
255+
executeAndCallQEListener(
256+
"save",
257+
df.queryExecution,
258+
OutputParams(source, destination, extraOptions.toMap)) {
259+
dataSource.write(mode, df)
260+
}
222261
}
223262

224263
/**
@@ -244,6 +283,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
244283
*
245284
* Because it inserts data to an existing table, format or options will be ignored.
246285
*
286+
* Calls the callback methods of @see[[QueryExecutionListener]] after query execution with
287+
* @see[[OutputParams]] having datasourceType set as the string parameter passed to the
288+
* @see[[DataFrameWriter#format]] method and destination set as the name of the table into which
289+
* data is being inserted into.
290+
*
247291
* @since 1.4.0
248292
*/
249293
def insertInto(tableName: String): Unit = {
@@ -261,13 +305,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
261305
)
262306
}
263307

264-
df.sparkSession.sessionState.executePlan(
308+
val qe = df.sparkSession.sessionState.executePlan(
265309
InsertIntoTable(
266310
table = UnresolvedRelation(tableIdent),
267311
partition = Map.empty[String, Option[String]],
268312
child = df.logicalPlan,
269313
overwrite = mode == SaveMode.Overwrite,
270-
ifNotExists = false)).toRdd
314+
ifNotExists = false))
315+
executeAndCallQEListener(
316+
"insertInto",
317+
qe,
318+
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) {
319+
qe.toRdd
320+
}
271321
}
272322

273323
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
@@ -324,7 +374,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
324374

325375
private def assertNotPartitioned(operation: String): Unit = {
326376
if (partitioningColumns.isDefined) {
327-
throw new AnalysisException( s"'$operation' does not support partitioning")
377+
throw new AnalysisException(s"'$operation' does not support partitioning")
328378
}
329379
}
330380

@@ -359,6 +409,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
359409
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
360410
* specific format.
361411
*
412+
* Calls the callback methods of @see[[QueryExecutionListener]] after query execution with a
413+
* @see[[OutputParams]] object having datasourceType set as the string parameter passed to the
414+
* @see[[DataFrameWriter#format]] and destination set as the name of the table being
415+
* written to
362416
* @since 1.4.0
363417
*/
364418
def saveAsTable(tableName: String): Unit = {
@@ -428,8 +482,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
428482
partitionColumnNames = partitioningColumns.getOrElse(Nil),
429483
bucketSpec = getBucketSpec
430484
)
431-
df.sparkSession.sessionState.executePlan(
432-
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
485+
val qe = df.sparkSession.sessionState.executePlan(
486+
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
487+
executeAndCallQEListener(
488+
"saveAsTable",
489+
qe,
490+
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) {
491+
qe.toRdd
492+
}
433493
}
434494

435495
/**
@@ -493,6 +553,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
493553
* indicates a timestamp format. Custom date formats follow the formats at
494554
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
495555
* </ul>
556+
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
557+
* @see[[OutputParams]] having datasourceType set as string constant "json" and
558+
* destination set as the path to which the data is written
496559
*
497560
* @since 1.4.0
498561
*/
@@ -514,6 +577,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
514577
* shorten names(none, `snappy`, `gzip`, and `lzo`). This will override
515578
* `spark.sql.parquet.compression.codec`.</li>
516579
* </ul>
580+
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
581+
* @see[[OutputParams]] having datasourceType set as string constant "parquet" and
582+
* destination set as the path to which the data is written
517583
*
518584
* @since 1.4.0
519585
*/
@@ -534,6 +600,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
534600
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
535601
* This will override `orc.compress`.</li>
536602
* </ul>
603+
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
604+
* @see[[OutputParams]] having datasourceType set as string constant "orc" and
605+
* destination set as the path to which the data is written
537606
*
538607
* @since 1.5.0
539608
* @note Currently, this method can only be used after enabling Hive support
@@ -560,6 +629,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
560629
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
561630
* `snappy` and `deflate`). </li>
562631
* </ul>
632+
* Calls the callback methods in e@see[[QueryExecutionListener]] methods after query execution
633+
* with @see[[OutputParams]] having datasourceType set as string constant "text" and
634+
* destination set as the path to which the data is written
563635
*
564636
* @since 1.6.0
565637
*/
@@ -599,6 +671,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
599671
* indicates a timestamp format. Custom date formats follow the formats at
600672
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
601673
* </ul>
674+
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
675+
* @see[[OutputParams]] having datasourceType set as string constant "csv" and
676+
* destination set as the path to which the data is written
602677
*
603678
* @since 2.0.0
604679
*/

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
4040
import org.apache.spark.sql.execution._
4141
import org.apache.spark.sql.execution.datasources.LogicalRelation
4242
import org.apache.spark.sql.execution.ui.SQLListener
43-
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
43+
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
4444
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4545
import org.apache.spark.sql.sources.BaseRelation
4646
import org.apache.spark.sql.streaming._
4747
import org.apache.spark.sql.types.{DataType, LongType, StructType}
48-
import org.apache.spark.sql.util.ExecutionListenerManager
48+
import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener}
4949
import org.apache.spark.util.Utils
5050

5151

@@ -876,6 +876,9 @@ object SparkSession {
876876
}
877877
session = new SparkSession(sparkContext)
878878
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
879+
for (qeListener <- createQueryExecutionListeners(session.sparkContext.getConf)) {
880+
session.listenerManager.register(qeListener)
881+
}
879882
defaultSession.set(session)
880883

881884
// Register a successfully instantiated context to the singleton. This should be at the
@@ -893,6 +896,12 @@ object SparkSession {
893896
}
894897
}
895898

899+
private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = {
900+
conf.get(SQLConf.QUERY_EXECUTION_LISTENERS)
901+
.map(Utils.classForName(_))
902+
.map(_.newInstance().asInstanceOf[QueryExecutionListener])
903+
}
904+
896905
/**
897906
* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
898907
*

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,13 @@ object SQLConf {
655655
.booleanConf
656656
.createWithDefault(false)
657657

658+
val QUERY_EXECUTION_LISTENERS =
659+
ConfigBuilder("spark.sql.queryExecutionListeners")
660+
.doc("QueryExecutionListeners to be attached to the SparkSession")
661+
.stringConf
662+
.toSequence
663+
.createWithDefault(Nil)
664+
658665
object Deprecated {
659666
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
660667
}

sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,27 +44,49 @@ trait QueryExecutionListener {
4444
* @param qe the QueryExecution object that carries detail information like logical plan,
4545
* physical plan, etc.
4646
* @param durationNs the execution time for this query in nanoseconds.
47-
*
48-
* @note This can be invoked by multiple different threads.
47+
* @param outputParams The output parameters in case the method is invoked as a result of a
48+
* write operation. In case of a read will be @see[[None]]
4949
*/
5050
@DeveloperApi
51-
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
52-
51+
def onSuccess(
52+
funcName: String,
53+
qe: QueryExecution,
54+
durationNs: Long,
55+
outputParams: Option[OutputParams]): Unit
5356
/**
5457
* A callback function that will be called when a query execution failed.
5558
*
5659
* @param funcName the name of the action that triggered this query.
5760
* @param qe the QueryExecution object that carries detail information like logical plan,
5861
* physical plan, etc.
5962
* @param exception the exception that failed this query.
63+
* @param outputParams The output parameters in case the method is invoked as a result of a
64+
* write operation. In case of a read will be @see[[None]]
6065
*
6166
* @note This can be invoked by multiple different threads.
6267
*/
6368
@DeveloperApi
64-
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
69+
def onFailure(
70+
funcName: String,
71+
qe: QueryExecution,
72+
exception: Exception,
73+
outputParams: Option[OutputParams]): Unit
6574
}
6675

67-
76+
/**
77+
* Contains extra information useful for query analysis passed on from the methods in
78+
* @see[[org.apache.spark.sql.DataFrameWriter]] while writing to a datasource
79+
* @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc.
80+
* @param destination path or table name written to
81+
* @param options the map containing the output options for the underlying datasource
82+
* specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method
83+
* @param writeParams will contain any extra information that the write method wants to provide
84+
*/
85+
case class OutputParams(
86+
datasourceType: String,
87+
destination: Option[String],
88+
options: Map[String, String],
89+
writeParams: Map[String, String] = Map.empty)
6890
/**
6991
* :: Experimental ::
7092
*
@@ -98,18 +120,26 @@ class ExecutionListenerManager private[sql] () extends Logging {
98120
listeners.clear()
99121
}
100122

101-
private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
123+
private[sql] def onSuccess(
124+
funcName: String,
125+
qe: QueryExecution,
126+
duration: Long,
127+
outputParams: Option[OutputParams] = None): Unit = {
102128
readLock {
103129
withErrorHandling { listener =>
104-
listener.onSuccess(funcName, qe, duration)
130+
listener.onSuccess(funcName, qe, duration, outputParams)
105131
}
106132
}
107133
}
108134

109-
private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
135+
private[sql] def onFailure(
136+
funcName: String,
137+
qe: QueryExecution,
138+
exception: Exception,
139+
outputParams: Option[OutputParams] = None): Unit = {
110140
readLock {
111141
withErrorHandling { listener =>
112-
listener.onFailure(funcName, qe, exception)
142+
listener.onFailure(funcName, qe, exception, outputParams)
113143
}
114144
}
115145
}

0 commit comments

Comments
 (0)