Skip to content

Commit ecf9f34

Browse files
author
Salil Surendran
committed
Committing to fix code review issues.
1 parent 752125a commit ecf9f34

File tree

7 files changed

+53
-76
lines changed

7 files changed

+53
-76
lines changed

docs/sql-programming-guide.md

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,11 +1300,28 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp
13001300

13011301
</table>
13021302

1303+
## QueryExecutionListener Options
1304+
Use this configuration option to attach query execution listeners
1305+
1306+
<table class="table">
1307+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
1308+
<tr>
1309+
<td><code>spark.sql.queryExecutionListeners</code></td>
1310+
<td></td>
1311+
<td>
1312+
A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession,
1313+
instances of these listeners will be added to it. These classes needs to have a zero-argument
1314+
constructor. If the specified class can't be found or the class specified doesn't have a valid
1315+
constructor the SparkSession creation will fail with an exception.
1316+
</td>
1317+
</tr>
1318+
</table>
1319+
13031320
## Other Configuration Options
13041321

1305-
The following options can also be used to tune the performance of query execution and attaching
1306-
query execution listeners. It is possible that these options will be deprecated in future release as
1307-
more optimizations are performed automatically.
1322+
The following options can also be used to tune the performance of query execution. It is possible
1323+
that these options will be deprecated in future release as more optimizations are performed
1324+
automatically.
13081325

13091326
<table class="table">
13101327
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -1351,16 +1368,6 @@ more optimizations are performed automatically.
13511368
Configures the number of partitions to use when shuffling data for joins or aggregations.
13521369
</td>
13531370
</tr>
1354-
<tr>
1355-
<td><code>spark.sql.queryExecutionListeners</code></td>
1356-
<td></td>
1357-
<td>
1358-
A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession,
1359-
instances of these listeners will be added to it. These classes needs to have a zero-argument
1360-
constructor. If the specified class can't be found or the class specified doesn't have a valid
1361-
constructor the SparkSession creation will fail with an exception.
1362-
</td>
1363-
</tr>
13641371
</table>
13651372

13661373
# Distributed SQL Engine

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -192,16 +192,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
192192
}
193193

194194
/**
195-
* Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener}
196-
* methods.
195+
* Wrap a DataFrameWriter action to track the query execution and time cost, then report to the
196+
* user-registered callback functions.
197197
*
198198
* @param funcName A identifier for the method executing the query
199-
* @param qe the @see [[QueryExecution]] object associated with the query
199+
* @param qe the @see `QueryExecution` object associated with the query
200200
* @param outputParams The output parameters useful for query analysis
201201
* @param action the function that executes the query after which the listener methods gets
202202
* called.
203203
*/
204-
private def executeAndCallQEListener(
204+
private def withAction(
205205
funcName: String,
206206
qe: QueryExecution,
207207
outputParams: OutputParams)(action: => Unit) = {
@@ -250,11 +250,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
250250
case "jdbc" => extraOptions.get("dbtable")
251251
case _ => extraOptions.get("path")
252252
}
253-
254-
executeAndCallQEListener(
255-
"save",
256-
df.queryExecution,
257-
OutputParams(source, destination, extraOptions.toMap)) {
253+
val outputParams = OutputParams(source, destination, extraOptions.toMap)
254+
withAction("save", df.queryExecution, outputParams) {
258255
dataSource.write(mode, df)
259256
}
260257
}
@@ -282,11 +279,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
282279
*
283280
* Because it inserts data to an existing table, format or options will be ignored.
284281
*
285-
* Calls the callback methods of @see[[QueryExecutionListener]] after query execution with
286-
* @see[[OutputParams]] having datasourceType set as the string parameter passed to the
287-
* @see[[DataFrameWriter#format]] method and destination set as the name of the table into which
288-
* data is being inserted into.
289-
*
290282
* @since 1.4.0
291283
*/
292284
def insertInto(tableName: String): Unit = {
@@ -311,12 +303,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
311303
child = df.logicalPlan,
312304
overwrite = mode == SaveMode.Overwrite,
313305
ifNotExists = false))
314-
executeAndCallQEListener(
315-
"insertInto",
316-
qe,
317-
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) {
318-
qe.toRdd
319-
}
306+
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
307+
withAction("insertInto", qe, outputParams)(qe.toRdd)
320308
}
321309

322310
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
@@ -408,10 +396,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
408396
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
409397
* specific format.
410398
*
411-
* Calls the callback methods of @see[[QueryExecutionListener]] after query execution with a
412-
* @see[[OutputParams]] object having datasourceType set as the string parameter passed to the
413-
* @see[[DataFrameWriter#format]] and destination set as the name of the table being
414-
* written to
415399
* @since 1.4.0
416400
*/
417401
def saveAsTable(tableName: String): Unit = {
@@ -483,12 +467,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
483467
)
484468
val qe = df.sparkSession.sessionState.executePlan(
485469
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
486-
executeAndCallQEListener(
487-
"saveAsTable",
488-
qe,
489-
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) {
490-
qe.toRdd
491-
}
470+
val outputParams = new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
471+
withAction("saveAsTable", qe, outputParams)(qe.toRdd)
492472
}
493473

494474
/**
@@ -552,9 +532,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
552532
* indicates a timestamp format. Custom date formats follow the formats at
553533
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
554534
* </ul>
555-
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
556-
* @see[[OutputParams]] having datasourceType set as string constant "json" and
557-
* destination set as the path to which the data is written
558535
*
559536
* @since 1.4.0
560537
*/
@@ -576,9 +553,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
576553
* shorten names(none, `snappy`, `gzip`, and `lzo`). This will override
577554
* `spark.sql.parquet.compression.codec`.</li>
578555
* </ul>
579-
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
580-
* @see[[OutputParams]] having datasourceType set as string constant "parquet" and
581-
* destination set as the path to which the data is written
582556
*
583557
* @since 1.4.0
584558
*/
@@ -599,9 +573,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
599573
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
600574
* This will override `orc.compress`.</li>
601575
* </ul>
602-
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
603-
* @see[[OutputParams]] having datasourceType set as string constant "orc" and
604-
* destination set as the path to which the data is written
605576
*
606577
* @since 1.5.0
607578
* @note Currently, this method can only be used after enabling Hive support
@@ -628,9 +599,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
628599
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
629600
* `snappy` and `deflate`). </li>
630601
* </ul>
631-
* Calls the callback methods in e@see[[QueryExecutionListener]] methods after query execution
632-
* with @see[[OutputParams]] having datasourceType set as string constant "text" and
633-
* destination set as the path to which the data is written
634602
*
635603
* @since 1.6.0
636604
*/
@@ -670,9 +638,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
670638
* indicates a timestamp format. Custom date formats follow the formats at
671639
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
672640
* </ul>
673-
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with
674-
* @see[[OutputParams]] having datasourceType set as string constant "csv" and
675-
* destination set as the path to which the data is written
676641
*
677642
* @since 2.0.0
678643
*/

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
4040
import org.apache.spark.sql.execution._
4141
import org.apache.spark.sql.execution.datasources.LogicalRelation
4242
import org.apache.spark.sql.execution.ui.SQLListener
43-
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
43+
import org.apache.spark.sql.internal._
4444
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4545
import org.apache.spark.sql.sources.BaseRelation
4646
import org.apache.spark.sql.streaming._
@@ -897,7 +897,7 @@ object SparkSession {
897897
}
898898

899899
private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = {
900-
conf.get(SQLConf.QUERY_EXECUTION_LISTENERS)
900+
conf.get(StaticSQLConf.QUERY_EXECUTION_LISTENERS)
901901
.map(Utils.classForName(_))
902902
.map(_.newInstance().asInstanceOf[QueryExecutionListener])
903903
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -660,21 +660,12 @@ object SQLConf {
660660
.booleanConf
661661
.createWithDefault(false)
662662

663-
664-
val QUERY_EXECUTION_LISTENERS =
665-
ConfigBuilder("spark.sql.queryExecutionListeners")
666-
.doc("QueryExecutionListeners to be attached to the SparkSession")
667-
.stringConf
668-
.toSequence
669-
.createWithDefault(Nil)
670-
671663
val SESSION_LOCAL_TIMEZONE =
672664
SQLConfigBuilder("spark.sql.session.timeZone")
673665
.doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""")
674666
.stringConf
675667
.createWithDefault(TimeZone.getDefault().getID())
676668

677-
678669
object Deprecated {
679670
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
680671
}
@@ -1056,4 +1047,14 @@ object StaticSQLConf {
10561047
"SQL configuration and the current database.")
10571048
.booleanConf
10581049
.createWithDefault(false)
1050+
1051+
val QUERY_EXECUTION_LISTENERS = buildConf("spark.sql.queryExecutionListeners")
1052+
.doc("A comma-separated list of classes that implement QueryExecutionListener. When creating " +
1053+
"a SparkSession, instances of these listeners will be added to it. These classes " +
1054+
"needs to have a zero-argument constructor. If the specified class can't be found or" +
1055+
" the class specified doesn't have a valid constructor the SparkSession creation " +
1056+
"will fail with an exception.")
1057+
.stringConf
1058+
.toSequence
1059+
.createWithDefault(Nil)
10591060
}

sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ trait QueryExecutionListener {
4545
* physical plan, etc.
4646
* @param durationNs the execution time for this query in nanoseconds.
4747
* @param outputParams The output parameters in case the method is invoked as a result of a
48-
* write operation. In case of a read will be @see[[None]]
48+
* write operation. In case of a read will be @see `None`
4949
*/
5050
@DeveloperApi
5151
def onSuccess(
@@ -61,7 +61,7 @@ trait QueryExecutionListener {
6161
* physical plan, etc.
6262
* @param exception the exception that failed this query.
6363
* @param outputParams The output parameters in case the method is invoked as a result of a
64-
* write operation. In case of a read will be @see[[None]]
64+
* write operation. In case of a read will be @see `None`
6565
*
6666
* @note This can be invoked by multiple different threads.
6767
*/
@@ -75,13 +75,14 @@ trait QueryExecutionListener {
7575

7676
/**
7777
* Contains extra information useful for query analysis passed on from the methods in
78-
* @see[[org.apache.spark.sql.DataFrameWriter]] while writing to a datasource
78+
* @see `org.apache.spark.sql.DataFrameWriter` while writing to a datasource
7979
* @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc.
8080
* @param destination path or table name written to
8181
* @param options the map containing the output options for the underlying datasource
82-
* specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method
82+
* specified by using the @see `org.apache.spark.sql.DataFrameWriter#option` method
8383
* @param writeParams will contain any extra information that the write method wants to provide
8484
*/
85+
@DeveloperApi
8586
case class OutputParams(
8687
datasourceType: String,
8788
destination: Option[String],

sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener}
2525

2626
/**
2727
* Test cases for the property 'spark.sql.queryExecutionListeners' that adds the
28-
* @see[[QueryExecutionListener]] to a @see[[SparkSession]]
28+
* @see `QueryExecutionListener` to a @see `SparkSession`
2929
*/
3030
class SparkSQLQueryExecutionListenerSuite
3131
extends SparkFunSuite

sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
174174
Seq(1 -> 100).toDF("x", "y").write.saveAsTable("bar")
175175
}
176176
assert(onWriteSuccessCalled)
177-
spark.listenerManager.clear()
178177
}
179178

180179
private def callSave(source: String, callSaveFunction: (DataFrame, String) => Unit): Unit = {
@@ -184,7 +183,6 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
184183
callSaveFunction(Seq(1 -> 100).toDF("x", "y"), path.getAbsolutePath)
185184
}
186185
assert(testQueryExecutionListener.onWriteSuccessCalled)
187-
spark.listenerManager.clear()
188186
}
189187

190188
// TODO: Currently some LongSQLMetric use -1 as initial value, so if the accumulator is never
@@ -265,4 +263,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
265263
}
266264
}
267265

266+
protected override def afterEach(): Unit = {
267+
super.afterEach()
268+
spark.listenerManager.clear()
269+
}
270+
268271
}

0 commit comments

Comments
 (0)