@@ -28,8 +28,10 @@ import org.apache.spark.{InterruptibleIterator, Logging, Partition, Serializable
2828import org .apache .spark .annotation .DeveloperApi
2929import org .apache .spark .input .WholeTextFileInputFormat
3030
31- private [spark]
32- class NewHadoopPartition (rddId : Int , val index : Int , @ transient rawSplit : InputSplit with Writable )
31+ private [spark] class NewHadoopPartition (
32+ rddId : Int ,
33+ val index : Int ,
34+ @ transient rawSplit : InputSplit with Writable )
3335 extends Partition {
3436
3537 val serializableHadoopSplit = new SerializableWritable (rawSplit)
@@ -57,8 +59,7 @@ class NewHadoopRDD[K, V](
5759 inputFormatClass : Class [_ <: InputFormat [K , V ]],
5860 keyClass : Class [K ],
5961 valueClass : Class [V ],
60- @ transient conf : Configuration ,
61- minSplits : Int = 1 )
62+ @ transient conf : Configuration )
6263 extends RDD [(K , V )](sc, Nil )
6364 with SparkHadoopMapReduceUtil
6465 with Logging {
@@ -67,18 +68,19 @@ class NewHadoopRDD[K, V](
6768 private val confBroadcast = sc.broadcast(new SerializableWritable (conf))
6869 // private val serializableConf = new SerializableWritable(conf)
6970
70- private val jobtrackerId : String = {
71+ private val jobTrackerId : String = {
7172 val formatter = new SimpleDateFormat (" yyyyMMddHHmm" )
7273 formatter.format(new Date ())
7374 }
7475
75- @ transient private val jobId = new JobID (jobtrackerId , id)
76+ @ transient protected val jobId = new JobID (jobTrackerId , id)
7677
7778 override def getPartitions : Array [Partition ] = {
7879 val inputFormat = inputFormatClass.newInstance
79-
80- if (inputFormat.isInstanceOf [Configurable ]) {
81- inputFormat.asInstanceOf [Configurable ].setConf(conf)
80+ inputFormat match {
81+ case configurable : Configurable =>
82+ configurable.setConf(conf)
83+ case _ =>
8284 }
8385
8486 val jobContext = newJobContext(conf, jobId)
@@ -100,11 +102,13 @@ class NewHadoopRDD[K, V](
100102 val split = theSplit.asInstanceOf [NewHadoopPartition ]
101103 logInfo(" Input split: " + split.serializableHadoopSplit)
102104 val conf = confBroadcast.value.value
103- val attemptId = newTaskAttemptID(jobtrackerId , id, isMap = true , split.index, 0 )
105+ val attemptId = newTaskAttemptID(jobTrackerId , id, isMap = true , split.index, 0 )
104106 val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
105107 val format = inputFormatClass.newInstance
106- if (format.isInstanceOf [Configurable ]) {
107- format.asInstanceOf [Configurable ].setConf(conf)
108+ format match {
109+ case configurable : Configurable =>
110+ configurable.setConf(conf)
111+ case _ =>
108112 }
109113 val reader = format.createRecordReader(
110114 split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -150,3 +154,30 @@ class NewHadoopRDD[K, V](
150154 def getConf : Configuration = confBroadcast.value.value
151155}
152156
157+ private [spark] class WholeTextFileRDD (
158+ sc : SparkContext ,
159+ inputFormatClass : Class [_ <: WholeTextFileInputFormat ],
160+ keyClass : Class [String ],
161+ valueClass : Class [String ],
162+ @ transient conf : Configuration ,
163+ minSplits : Int )
164+ extends NewHadoopRDD [String , String ](sc, inputFormatClass, keyClass, valueClass, conf) {
165+
166+ override def getPartitions : Array [Partition ] = {
167+ val inputFormat = inputFormatClass.newInstance
168+ inputFormat match {
169+ case configurable : Configurable =>
170+ configurable.setConf(conf)
171+ case _ =>
172+ }
173+ val jobContext = newJobContext(conf, jobId)
174+ inputFormat.setMaxSplitSize(jobContext, minSplits)
175+ val rawSplits = inputFormat.getSplits(jobContext).toArray
176+ val result = new Array [Partition ](rawSplits.size)
177+ for (i <- 0 until rawSplits.size) {
178+ result(i) = new NewHadoopPartition (id, i, rawSplits(i).asInstanceOf [InputSplit with Writable ])
179+ }
180+ result
181+ }
182+ }
183+
0 commit comments