@@ -217,14 +217,6 @@ class HadoopRDD[K, V](
217217 private val inputMetrics = context.taskMetrics().inputMetrics
218218 private val existingBytesRead = inputMetrics.bytesRead
219219
220- // Sets InputFileBlockHolder for the file block's information
221- split.inputSplit.value match {
222- case fs : FileSplit =>
223- InputFileBlockHolder .set(fs.getPath.toString, fs.getStart, fs.getLength)
224- case _ =>
225- InputFileBlockHolder .unset()
226- }
227-
228220 // Find a function that will return the FileSystem bytes read by this thread. Do this before
229221 // creating RecordReader, because RecordReader's constructor might read some bytes
230222 private val getBytesReadCallback : Option [() => Long ] = split.inputSplit.value match {
@@ -263,7 +255,23 @@ class HadoopRDD[K, V](
263255 private val key : K = if (reader == null ) null .asInstanceOf [K ] else reader.createKey()
264256 private val value : V = if (reader == null ) null .asInstanceOf [V ] else reader.createValue()
265257
258+ private var setInputFileBlockHolder : Boolean = false
259+
266260 override def getNext (): (K , V ) = {
261+ if (! setInputFileBlockHolder) {
262+ // Sets InputFileBlockHolder for the file block's information
263+ // We can't set it before consuming this iterator, otherwise some expressions which
264+ // use thread local variables will fail when working with Python UDF. That is because
265+ // the batch of Python UDF is running in individual thread.
266+ split.inputSplit.value match {
267+ case fs : FileSplit =>
268+ InputFileBlockHolder .set(fs.getPath.toString, fs.getStart, fs.getLength)
269+ case _ =>
270+ InputFileBlockHolder .unset()
271+ }
272+ setInputFileBlockHolder = true
273+ }
274+
267275 try {
268276 finished = ! reader.next(key, value)
269277 } catch {
0 commit comments