Skip to content

Commit 3715203

Browse files
committed
Add a flag to ignore corrupt files
1 parent d3890de commit 3715203

File tree

3 files changed

+74
-4
lines changed

3 files changed

+74
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ class HadoopRDD[K, V](
139139

140140
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
141141

142+
private val ignoreCorruptFiles =
143+
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
144+
142145
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
143146
protected def getJobConf(): JobConf = {
144147
val conf: Configuration = broadcastedConf.value.value
@@ -245,8 +248,7 @@ class HadoopRDD[K, V](
245248
try {
246249
finished = !reader.next(key, value)
247250
} catch {
248-
case eof: EOFException =>
249-
finished = true
251+
case _: EOFException if ignoreCorruptFiles => finished = true
250252
}
251253
if (!finished) {
252254
inputMetrics.incRecordsRead(1)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.EOFException
2021
import java.text.SimpleDateFormat
2122
import java.util.Date
2223

@@ -84,6 +85,9 @@ class NewHadoopRDD[K, V](
8485

8586
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
8687

88+
private val ignoreCorruptFiles =
89+
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
90+
8791
def getConf: Configuration = {
8892
val conf: Configuration = confBroadcast.value.value
8993
if (shouldCloneJobConf) {
@@ -171,7 +175,11 @@ class NewHadoopRDD[K, V](
171175

172176
override def hasNext: Boolean = {
173177
if (!finished && !havePair) {
174-
finished = !reader.nextKeyValue
178+
try {
179+
finished = !reader.nextKeyValue
180+
} catch {
181+
case _: EOFException if ignoreCorruptFiles => finished = true
182+
}
175183
if (finished) {
176184
// Close and release the reader here; close() will also be called when the task
177185
// completes, but for tasks that read from many files, it helps to release the

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.{File, FileWriter}
20+
import java.io._
21+
import java.util.zip.GZIPOutputStream
2122

2223
import org.apache.spark.deploy.SparkHadoopUtil
2324
import org.apache.spark.input.PortableDataStream
@@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
540541
}.collect()
541542
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
542543
}
544+
545+
test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
546+
val inputFile = File.createTempFile("input-", ".gz")
547+
try {
548+
// Create a corrupt gzip file
549+
val byteOutput = new ByteArrayOutputStream()
550+
val gzip = new GZIPOutputStream(byteOutput)
551+
try {
552+
gzip.write(Array[Byte](1, 2, 3, 4))
553+
} finally {
554+
gzip.close()
555+
}
556+
val bytes = byteOutput.toByteArray
557+
val o = new FileOutputStream(inputFile)
558+
try {
559+
// It's corrupt since we only write half of bytes into the file.
560+
o.write(bytes.take(bytes.length / 2))
561+
} finally {
562+
o.close()
563+
}
564+
565+
// Spark job should ignore corrupt files by default
566+
sc = new SparkContext("local", "test")
567+
// Test HadoopRDD
568+
assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
569+
// Test NewHadoopRDD
570+
assert {
571+
sc.newAPIHadoopFile(
572+
inputFile.toURI.toString,
573+
classOf[NewTextInputFormat],
574+
classOf[LongWritable],
575+
classOf[Text]).collect().isEmpty
576+
}
577+
sc.stop()
578+
579+
// Reading a corrupt gzip file should throw EOFException
580+
val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false")
581+
sc = new SparkContext("local", "test", conf)
582+
// Test HadoopRDD
583+
var e = intercept[SparkException] {
584+
sc.textFile(inputFile.toURI.toString).collect()
585+
}
586+
assert(e.getCause.isInstanceOf[EOFException])
587+
assert(e.getCause.getMessage === "Unexpected end of input stream")
588+
// Test NewHadoopRDD
589+
e = intercept[SparkException] {
590+
sc.newAPIHadoopFile(
591+
inputFile.toURI.toString,
592+
classOf[NewTextInputFormat],
593+
classOf[LongWritable],
594+
classOf[Text]).collect()
595+
}
596+
assert(e.getCause.isInstanceOf[EOFException])
597+
assert(e.getCause.getMessage === "Unexpected end of input stream")
598+
} finally {
599+
inputFile.delete()
600+
}
601+
}
602+
543603
}

0 commit comments

Comments
 (0)