@@ -20,14 +20,15 @@ package org.apache.spark.input
2020import scala .collection .JavaConversions ._
2121import com .google .common .io .{ByteStreams , Closeables }
2222import org .apache .hadoop .mapreduce .InputSplit
23+ import org .apache .hadoop .conf .Configuration
2324import org .apache .hadoop .mapreduce .lib .input .CombineFileSplit
2425import org .apache .hadoop .mapreduce .RecordReader
2526import org .apache .hadoop .mapreduce .TaskAttemptContext
2627import org .apache .hadoop .fs .{FSDataInputStream , Path }
2728import org .apache .hadoop .mapreduce .lib .input .CombineFileInputFormat
2829import org .apache .hadoop .mapreduce .JobContext
2930import org .apache .hadoop .mapreduce .lib .input .CombineFileRecordReader
30- import java .io .DataInputStream
31+ import java .io .{ ByteArrayInputStream , ByteArrayOutputStream , DataOutputStream , DataInputStream }
3132
3233
3334/**
@@ -58,17 +59,50 @@ abstract class StreamFileInputFormat[T]
5859/**
5960 * A class that allows DataStreams to be serialized and moved around by not creating them
6061 * until they need to be read
62+ * @note TaskAttemptContext is not serializable resulting in the confBytes construct
63+ * @note CombineFileSplit is not serializable resulting in the splitBytes construct
6164 */
62- class PortableDataStream (split : CombineFileSplit , context : TaskAttemptContext , index : Integer )
65+ class PortableDataStream (@ transient isplit : CombineFileSplit , @ transient context : TaskAttemptContext , index : Integer )
6366 extends Serializable {
64- // transient forces file to be reopened after being moved (serialization)
67+ // transient forces file to be reopened after being serialization
68+ // it is also used for non-serializable classes
69+
6570 @ transient
6671 private var fileIn : FSDataInputStream = null .asInstanceOf [FSDataInputStream ]
6772 @ transient
6873 private var isOpen = false
74+
75+ private val confBytes = {
76+ val baos = new ByteArrayOutputStream ()
77+ context.getConfiguration.write(new DataOutputStream (baos))
78+ baos.toByteArray
79+ }
80+
81+ private val splitBytes = {
82+ val baos = new ByteArrayOutputStream ()
83+ isplit.write(new DataOutputStream (baos))
84+ baos.toByteArray
85+ }
86+
87+ @ transient
88+ private lazy val split = {
89+ val bais = new ByteArrayInputStream (splitBytes)
90+ val nsplit = new CombineFileSplit ()
91+ nsplit.readFields(new DataInputStream (bais))
92+ nsplit
93+ }
94+
95+ @ transient
96+ private lazy val conf = {
97+ val bais = new ByteArrayInputStream (confBytes)
98+ val nconf = new Configuration ()
99+ nconf.readFields(new DataInputStream (bais))
100+ nconf
101+ }
69102 /**
70103 * Calculate the path name independently of opening the file
71104 */
105+ @ transient
72106 private lazy val path = {
73107 val pathp = split.getPath(index)
74108 pathp.toString
@@ -80,7 +114,7 @@ class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, i
80114 def open (): FSDataInputStream = {
81115 if (! isOpen) {
82116 val pathp = split.getPath(index)
83- val fs = pathp.getFileSystem(context.getConfiguration )
117+ val fs = pathp.getFileSystem(conf )
84118 fileIn = fs.open(pathp)
85119 isOpen= true
86120 }
@@ -207,4 +241,4 @@ abstract class BinaryRecordReader[T](
207241 parseByteArray(innerBuffer)
208242 }
209243 def parseByteArray (inArray : Array [Byte ]): T
210- }
244+ }
0 commit comments