@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable
2929import org .apache .hadoop .mapred .{JobConf , Reporter }
3030import org .apache .hadoop .mapreduce .{Job , TaskAttemptContext }
3131
32+ import org .apache .spark .internal .Logging
3233import org .apache .spark .sql .SparkSession
3334import org .apache .spark .sql .catalyst .InternalRow
3435import org .apache .spark .sql .execution .datasources .{FileFormat , OutputWriter , OutputWriterFactory }
@@ -42,7 +43,7 @@ import org.apache.spark.util.SerializableJobConf
4243 *
4344 * TODO: implement the read logic.
4445 */
45- class HiveFileFormat (fileSinkConf : FileSinkDesc ) extends FileFormat {
46+ class HiveFileFormat (fileSinkConf : FileSinkDesc ) extends FileFormat with Logging {
4647 override def inferSchema (
4748 sparkSession : SparkSession ,
4849 options : Map [String , String ],
@@ -59,6 +60,19 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
5960 val tableDesc = fileSinkConf.getTableInfo
6061 conf.set(" mapred.output.format.class" , tableDesc.getOutputFileFormatClassName)
6162
63+ // When speculation is on and output committer class name contains "Direct", we should warn
64+ // users that they may loss data if they are using a direct output committer.
65+ val speculationEnabled = sparkSession.sparkContext.conf.getBoolean(" spark.speculation" , false )
66+ val outputCommitterClass = conf.get(" mapred.output.committer.class" , " " )
67+ if (speculationEnabled && outputCommitterClass.contains(" Direct" )) {
68+ val warningMessage =
69+ s " $outputCommitterClass may be an output committer that writes data directly to " +
70+ " the final location. Because speculation is enabled, this output committer may " +
71+ " cause data loss (see the case in SPARK-10063). If possible, please use an output " +
72+ " committer that does not have this behavior (e.g. FileOutputCommitter)."
73+ logWarning(warningMessage)
74+ }
75+
6276 // Add table properties from storage handler to hadoopConf, so any custom storage
6377 // handler settings can be set to hadoopConf
6478 HiveTableUtil .configureJobPropertiesForStorageHandler(tableDesc, conf, false )
0 commit comments