Skip to content

Commit 150efa2

Browse files
committed
address comments
1 parent 2f24c10 commit 150efa2

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable
2929
import org.apache.hadoop.mapred.{JobConf, Reporter}
3030
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
3131

32+
import org.apache.spark.internal.Logging
3233
import org.apache.spark.sql.SparkSession
3334
import org.apache.spark.sql.catalyst.InternalRow
3435
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
@@ -42,7 +43,7 @@ import org.apache.spark.util.SerializableJobConf
4243
*
4344
* TODO: implement the read logic.
4445
*/
45-
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
46+
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
4647
override def inferSchema(
4748
sparkSession: SparkSession,
4849
options: Map[String, String],
@@ -59,6 +60,19 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
5960
val tableDesc = fileSinkConf.getTableInfo
6061
conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)
6162

63+
// When speculation is on and output committer class name contains "Direct", we should warn
64+
// users that they may loss data if they are using a direct output committer.
65+
val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
66+
val outputCommitterClass = conf.get("mapred.output.committer.class", "")
67+
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
68+
val warningMessage =
69+
s"$outputCommitterClass may be an output committer that writes data directly to " +
70+
"the final location. Because speculation is enabled, this output committer may " +
71+
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
72+
"committer that does not have this behavior (e.g. FileOutputCommitter)."
73+
logWarning(warningMessage)
74+
}
75+
6276
// Add table properties from storage handler to hadoopConf, so any custom storage
6377
// handler settings can be set to hadoopConf
6478
HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)

0 commit comments

Comments
 (0)