@@ -192,16 +192,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
192192 }
193193
194194 /**
195- * Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener}
196- * methods .
195+ * Wrap a DataFrameWriter action to track the query execution and time cost, then report to the
196+ * user-registered callback functions .
197197 *
198198 * @param funcName A identifier for the method executing the query
199- * @param qe the @see [[ QueryExecution ]] object associated with the query
199+ * @param qe the @see ` QueryExecution` object associated with the query
200200 * @param outputParams The output parameters useful for query analysis
201201 * @param action the function that executes the query after which the listener methods gets
202202 * called.
203203 */
204- private def executeAndCallQEListener (
204+ private def withAction (
205205 funcName : String ,
206206 qe : QueryExecution ,
207207 outputParams : OutputParams )(action : => Unit ) = {
@@ -250,11 +250,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
250250 case " jdbc" => extraOptions.get(" dbtable" )
251251 case _ => extraOptions.get(" path" )
252252 }
253-
254- executeAndCallQEListener(
255- " save" ,
256- df.queryExecution,
257- OutputParams (source, destination, extraOptions.toMap)) {
253+ val outputParams = OutputParams (source, destination, extraOptions.toMap)
254+ withAction(" save" , df.queryExecution, outputParams) {
258255 dataSource.write(mode, df)
259256 }
260257 }
@@ -282,11 +279,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
282279 *
283280 * Because it inserts data to an existing table, format or options will be ignored.
284281 *
285- * Calls the callback methods of @see [[QueryExecutionListener ]] after query execution with
286- * @see [[OutputParams ]] having datasourceType set as the string parameter passed to the
287- * @see [[DataFrameWriter#format ]] method and destination set as the name of the table into which
288- * data is being inserted into.
289- *
290282 * @since 1.4.0
291283 */
292284 def insertInto (tableName : String ): Unit = {
@@ -311,12 +303,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
311303 child = df.logicalPlan,
312304 overwrite = mode == SaveMode .Overwrite ,
313305 ifNotExists = false ))
314- executeAndCallQEListener(
315- " insertInto" ,
316- qe,
317- new OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)) {
318- qe.toRdd
319- }
306+ val outputParams = OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)
307+ withAction(" insertInto" , qe, outputParams)(qe.toRdd)
320308 }
321309
322310 private def normalizedParCols : Option [Seq [String ]] = partitioningColumns.map { cols =>
@@ -408,10 +396,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
408396 * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
409397 * specific format.
410398 *
411- * Calls the callback methods of @see [[QueryExecutionListener ]] after query execution with a
412- * @see [[OutputParams ]] object having datasourceType set as the string parameter passed to the
413- * @see [[DataFrameWriter#format ]] and destination set as the name of the table being
414- * written to
415399 * @since 1.4.0
416400 */
417401 def saveAsTable (tableName : String ): Unit = {
@@ -483,12 +467,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
483467 )
484468 val qe = df.sparkSession.sessionState.executePlan(
485469 CreateTable (tableDesc, mode, Some (df.logicalPlan)))
486- executeAndCallQEListener(
487- " saveAsTable" ,
488- qe,
489- new OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)) {
490- qe.toRdd
491- }
470+ val outputParams = new OutputParams (source, Some (tableIdent.unquotedString), extraOptions.toMap)
471+ withAction(" saveAsTable" , qe, outputParams)(qe.toRdd)
492472 }
493473
494474 /**
@@ -552,9 +532,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
552532 * indicates a timestamp format. Custom date formats follow the formats at
553533 * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
554534 * </ul>
555- * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
556- * @see [[OutputParams ]] having datasourceType set as string constant "json" and
557- * destination set as the path to which the data is written
558535 *
559536 * @since 1.4.0
560537 */
@@ -576,9 +553,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
576553 * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override
577554 * `spark.sql.parquet.compression.codec`.</li>
578555 * </ul>
579- * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
580- * @see [[OutputParams ]] having datasourceType set as string constant "parquet" and
581- * destination set as the path to which the data is written
582556 *
583557 * @since 1.4.0
584558 */
@@ -599,9 +573,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
599573 * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
600574 * This will override `orc.compress`.</li>
601575 * </ul>
602- * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
603- * @see [[OutputParams ]] having datasourceType set as string constant "orc" and
604- * destination set as the path to which the data is written
605576 *
606577 * @since 1.5.0
607578 * @note Currently, this method can only be used after enabling Hive support
@@ -628,9 +599,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
628599 * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
629600 * `snappy` and `deflate`). </li>
630601 * </ul>
631- * Calls the callback methods in e@see [[QueryExecutionListener ]] methods after query execution
632- * with @see [[OutputParams ]] having datasourceType set as string constant "text" and
633- * destination set as the path to which the data is written
634602 *
635603 * @since 1.6.0
636604 */
@@ -670,9 +638,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
670638 * indicates a timestamp format. Custom date formats follow the formats at
671639 * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
672640 * </ul>
673- * Calls the callback methods in @see [[QueryExecutionListener ]] methods after query execution with
674- * @see [[OutputParams ]] having datasourceType set as string constant "csv" and
675- * destination set as the path to which the data is written
676641 *
677642 * @since 2.0.0
678643 */
0 commit comments