Skip to content

Commit 5fd215f

Browse files
committed
Fix InputFileBlock for HadoopRDD.
1 parent ad0dada commit 5fd215f

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,6 @@ class HadoopRDD[K, V](
217217
private val inputMetrics = context.taskMetrics().inputMetrics
218218
private val existingBytesRead = inputMetrics.bytesRead
219219

220-
// Sets InputFileBlockHolder for the file block's information
221-
split.inputSplit.value match {
222-
case fs: FileSplit =>
223-
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
224-
case _ =>
225-
InputFileBlockHolder.unset()
226-
}
227-
228220
// Find a function that will return the FileSystem bytes read by this thread. Do this before
229221
// creating RecordReader, because RecordReader's constructor might read some bytes
230222
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
@@ -263,7 +255,23 @@ class HadoopRDD[K, V](
263255
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
264256
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
265257

258+
private var setInputFileBlockHolder: Boolean = false
259+
266260
override def getNext(): (K, V) = {
261+
if (!setInputFileBlockHolder) {
262+
// Sets InputFileBlockHolder for the file block's information
263+
// We can't set it before consuming this iterator, otherwise some expressions which
264+
// use thread local variables will fail when working with Python UDF. That is because
265+
// the batch of Python UDF is running in individual thread.
266+
split.inputSplit.value match {
267+
case fs: FileSplit =>
268+
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
269+
case _ =>
270+
InputFileBlockHolder.unset()
271+
}
272+
setInputFileBlockHolder = true
273+
}
274+
267275
try {
268276
finished = !reader.next(key, value)
269277
} catch {

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,6 @@ class NewHadoopRDD[K, V](
139139
private val inputMetrics = context.taskMetrics().inputMetrics
140140
private val existingBytesRead = inputMetrics.bytesRead
141141

142-
// Sets InputFileBlockHolder for the file block's information
143-
split.serializableHadoopSplit.value match {
144-
case fs: FileSplit =>
145-
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
146-
case _ =>
147-
InputFileBlockHolder.unset()
148-
}
149-
150142
// Find a function that will return the FileSystem bytes read by this thread. Do this before
151143
// creating RecordReader, because RecordReader's constructor might read some bytes
152144
private val getBytesReadCallback: Option[() => Long] =
@@ -217,7 +209,23 @@ class NewHadoopRDD[K, V](
217209
!finished
218210
}
219211

212+
private var setInputFileBlockHolder: Boolean = false
213+
220214
override def next(): (K, V) = {
215+
if (!setInputFileBlockHolder) {
216+
// Sets InputFileBlockHolder for the file block's information.
217+
// We can't set it before consuming this iterator, otherwise some expressions which
218+
// use thread local variables will fail when working with Python UDF. That is because
219+
// the batch of Python UDF is running in individual thread.
220+
split.serializableHadoopSplit.value match {
221+
case fs: FileSplit =>
222+
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
223+
case _ =>
224+
InputFileBlockHolder.unset()
225+
}
226+
setInputFileBlockHolder = true
227+
}
228+
221229
if (!hasNext) {
222230
throw new java.util.NoSuchElementException("End of stream")
223231
}

0 commit comments

Comments
 (0)