@@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2626import org .apache .spark .sql .catalyst .analysis .{EliminateSubqueryAliases , UnresolvedRelation }
2727import org .apache .spark .sql .catalyst .catalog .{BucketSpec , CatalogRelation , CatalogTable , CatalogTableType }
2828import org .apache .spark .sql .catalyst .plans .logical .InsertIntoTable
29+ import org .apache .spark .sql .execution .QueryExecution
2930import org .apache .spark .sql .execution .command .DDLUtils
3031import org .apache .spark .sql .execution .datasources .{CreateTable , DataSource , LogicalRelation }
3132import org .apache .spark .sql .sources .BaseRelation
3233import org .apache .spark .sql .types .StructType
34+ import org .apache .spark .sql .util .{OutputParams , QueryExecutionListener }
3335
3436/**
3537 * Interface used to write a [[Dataset ]] to external storage systems (e.g. file systems,
@@ -189,6 +191,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
189191 this
190192 }
191193
194+ /**
195+ * Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener}
196+ * methods.
197+ *
198+ * @param funcName A identifier for the method executing the query
199+ * @param qe the @see [[QueryExecution ]] object associated with the
200+ * query
201+ * @param outputParams The output parameters useful for query analysis
202+ * @param action the function that executes the query after which the listener methods gets
203+ * called.
204+ */
205+ private def executeAndCallQEListener (
206+ funcName : String ,
207+ qe : QueryExecution ,
208+ outputParams : OutputParams )(action : => Unit ) = {
209+ try {
210+ val start = System .nanoTime()
211+ action
212+ val end = System .nanoTime()
213+ df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some (outputParams))
214+ } catch {
215+ case e : Exception =>
216+ df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some (outputParams))
217+ throw e
218+ }
219+ }
220+
192221 /**
193222 * Saves the content of the `DataFrame` at the specified path.
194223 *
@@ -218,7 +247,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
218247 bucketSpec = getBucketSpec,
219248 options = extraOptions.toMap)
220249
221- dataSource.write(mode, df)
250+ val destination = source match {
251+ case " jdbc" => extraOptions.get(" dbtable" )
252+ case _ => extraOptions.get(" path" )
253+ }
254+
255+ executeAndCallQEListener(
256+ " save" ,
257+ df.queryExecution,
258+ OutputParams (source, destination, extraOptions.toMap)) {
259+ dataSource.write(mode, df)
260+ }
222261 }
223262
224263 /**
@@ -244,6 +283,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
244283 *
245284 * Because it inserts data to an existing table, format or options will be ignored.
246285 *
286+ * Calls the callback methods of @see [[QueryExecutionListener ]] after query execution with
287+ * @see [[OutputParams ]] having datasourceType set as the string parameter passed to the
288+ * @see [[DataFrameWriter#format ]] method and destination set as the name of the table into which
289+ * data is being inserted into.
290+ *
247291 * @since 1.4.0
248292 */
249293 def insertInto (tableName : String ): Unit = {
@@ -261,13 +305,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
261305 )
262306 }
263307
264- df.sparkSession.sessionState.executePlan(
308+ val qe = df.sparkSession.sessionState.executePlan(
265309 InsertIntoTable (
266310 table = UnresolvedRelation (tableIdent),
267311 partition = Map .empty[String , Option [String ]],
268312 child = df.logicalPlan,
269313 overwrite = mode == SaveMode .Overwrite ,
270- ifNotExists = false )).toRdd
314+ ifNotExists = false ))
315+ executeAndCallQEListener(
316+ " insertInto" ,
317+ qe,
318+ new OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)) {
319+ qe.toRdd
320+ }
271321 }
272322
273323 private def normalizedParCols : Option [Seq [String ]] = partitioningColumns.map { cols =>
@@ -324,7 +374,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
324374
325375 private def assertNotPartitioned (operation : String ): Unit = {
326376 if (partitioningColumns.isDefined) {
327- throw new AnalysisException ( s " ' $operation' does not support partitioning " )
377+ throw new AnalysisException (s " ' $operation' does not support partitioning " )
328378 }
329379 }
330380
@@ -359,6 +409,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
359409 * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
360410 * specific format.
361411 *
412+ * Calls the callback methods of @see [[QueryExecutionListener ]] after query execution with a
413+ * @see [[OutputParams ]] object having datasourceType set as the string parameter passed to the
414+ * @see [[DataFrameWriter#format ]] and destination set as the name of the table being
415+ * written to
362416 * @since 1.4.0
363417 */
364418 def saveAsTable (tableName : String ): Unit = {
@@ -428,8 +482,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
428482 partitionColumnNames = partitioningColumns.getOrElse(Nil ),
429483 bucketSpec = getBucketSpec
430484 )
431- df.sparkSession.sessionState.executePlan(
432- CreateTable (tableDesc, mode, Some (df.logicalPlan))).toRdd
485+ val qe = df.sparkSession.sessionState.executePlan(
486+ CreateTable (tableDesc, mode, Some (df.logicalPlan)))
487+ executeAndCallQEListener(
488+ " saveAsTable" ,
489+ qe,
490+ new OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)) {
491+ qe.toRdd
492+ }
433493 }
434494
435495 /**
@@ -493,6 +553,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
493553 * indicates a timestamp format. Custom date formats follow the formats at
494554 * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
495555 * </ul>
556+ * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
557+ * @see [[OutputParams ]] having datasourceType set as string constant "json" and
558+ * destination set as the path to which the data is written
496559 *
497560 * @since 1.4.0
498561 */
@@ -514,6 +577,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
514577 * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override
515578 * `spark.sql.parquet.compression.codec`.</li>
516579 * </ul>
580+ * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
581+ * @see [[OutputParams ]] having datasourceType set as string constant "parquet" and
582+ * destination set as the path to which the data is written
517583 *
518584 * @since 1.4.0
519585 */
@@ -534,6 +600,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
534600 * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
535601 * This will override `orc.compress`.</li>
536602 * </ul>
603+ * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
604+ * @see [[OutputParams ]] having datasourceType set as string constant "orc" and
605+ * destination set as the path to which the data is written
537606 *
538607 * @since 1.5.0
539608 * @note Currently, this method can only be used after enabling Hive support
@@ -560,6 +629,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
560629 * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
561630 * `snappy` and `deflate`). </li>
562631 * </ul>
632+ * Calls the callback methods in e@see [[QueryExecutionListener ]] methods after query execution
633+ * with @see [[OutputParams ]] having datasourceType set as string constant "text" and
634+ * destination set as the path to which the data is written
563635 *
564636 * @since 1.6.0
565637 */
@@ -599,6 +671,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
599671 * indicates a timestamp format. Custom date formats follow the formats at
600672 * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
601673 * </ul>
674+ * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
675+ * @see [[OutputParams ]] having datasourceType set as string constant "csv" and
676+ * destination set as the path to which the data is written
602677 *
603678 * @since 2.0.0
604679 */
0 commit comments