Skip to content

Commit c10af60

Browse files
committed
refine comments and rewrite new class for wholeTextFile
1 parent 766d05b commit c10af60

File tree

4 files changed

+62
-25
lines changed

4 files changed

+62
-25
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ class SparkContext(config: SparkConf) extends Logging {
444444
* hdfs://a-hdfs-path/part-nnnnn
445445
* }}}
446446
*
447-
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path", minSplits)`
447+
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`
448448
*
449449
* <p> then `rdd` contains
450450
* {{{
@@ -454,15 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
454454
* (a-hdfs-path/part-nnnnn, its content)
455455
* }}}
456456
*
457-
* @note Small files are preferred, as each file will be loaded fully in memory.
457+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
458+
*
459+
* @param minSplits A suggestion value of the minimal splitting number for input data.
458460
*/
459461
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
460-
newAPIHadoopFile(
461-
path,
462+
val job = new NewHadoopJob(hadoopConfiguration)
463+
NewFileInputFormat.addInputPath(job, new Path(path))
464+
val updateConf = job.getConfiguration
465+
new WholeTextFileRDD(
466+
this,
462467
classOf[WholeTextFileInputFormat],
463468
classOf[String],
464469
classOf[String],
465-
minSplits = minSplits)
470+
updateConf,
471+
minSplits)
466472
}
467473

468474
/**
@@ -585,12 +591,11 @@ class SparkContext(config: SparkConf) extends Logging {
585591
fClass: Class[F],
586592
kClass: Class[K],
587593
vClass: Class[V],
588-
conf: Configuration = hadoopConfiguration,
589-
minSplits: Int = 1): RDD[(K, V)] = {
594+
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
590595
val job = new NewHadoopJob(conf)
591596
NewFileInputFormat.addInputPath(job, new Path(path))
592597
val updatedConf = job.getConfiguration
593-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, minSplits)
598+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
594599
}
595600

596601
/**

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
167167
* hdfs://a-hdfs-path/part-nnnnn
168168
* }}}
169169
*
170-
* Do
171-
* `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path", minSplit)`
170+
* Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
172171
*
173172
* <p> then `rdd` contains
174173
* {{{
@@ -178,7 +177,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
178177
* (a-hdfs-path/part-nnnnn, its content)
179178
* }}}
180179
*
181-
* @note Small files are preferred, as each file will be loaded fully in memory.
180+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
181+
*
182+
* @param minSplits A suggestion value of the minimal splitting number for input data.
182183
*/
183184
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
184185
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.input
2020
import scala.collection.JavaConversions._
2121

2222
import org.apache.hadoop.fs.Path
23-
import org.apache.hadoop.fs.FileStatus
2423
import org.apache.hadoop.mapreduce.InputSplit
2524
import org.apache.hadoop.mapreduce.JobContext
2625
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
@@ -56,6 +55,7 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
5655
val totalLen = files.map { file =>
5756
if (file.isDir) 0L else file.getLen
5857
}.sum
59-
super.setMaxSplitSize(totalLen / (if (minSplits == 0) 1 else minSplits))
58+
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
59+
super.setMaxSplitSize(maxSplitSize)
6060
}
6161
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import org.apache.spark.{InterruptibleIterator, Logging, Partition, Serializable
2828
import org.apache.spark.annotation.DeveloperApi
2929
import org.apache.spark.input.WholeTextFileInputFormat
3030

31-
private[spark]
32-
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
31+
private[spark] class NewHadoopPartition(
32+
rddId: Int,
33+
val index: Int,
34+
@transient rawSplit: InputSplit with Writable)
3335
extends Partition {
3436

3537
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -57,8 +59,7 @@ class NewHadoopRDD[K, V](
5759
inputFormatClass: Class[_ <: InputFormat[K, V]],
5860
keyClass: Class[K],
5961
valueClass: Class[V],
60-
@transient conf: Configuration,
61-
minSplits: Int = 1)
62+
@transient conf: Configuration)
6263
extends RDD[(K, V)](sc, Nil)
6364
with SparkHadoopMapReduceUtil
6465
with Logging {
@@ -67,18 +68,19 @@ class NewHadoopRDD[K, V](
6768
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
6869
// private val serializableConf = new SerializableWritable(conf)
6970

70-
private val jobtrackerId: String = {
71+
private val jobTrackerId: String = {
7172
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
7273
formatter.format(new Date())
7374
}
7475

75-
@transient private val jobId = new JobID(jobtrackerId, id)
76+
@transient protected val jobId = new JobID(jobTrackerId, id)
7677

7778
override def getPartitions: Array[Partition] = {
7879
val inputFormat = inputFormatClass.newInstance
79-
80-
if (inputFormat.isInstanceOf[Configurable]) {
81-
inputFormat.asInstanceOf[Configurable].setConf(conf)
80+
inputFormat match {
81+
case configurable: Configurable =>
82+
configurable.setConf(conf)
83+
case _ =>
8284
}
8385

8486
val jobContext = newJobContext(conf, jobId)
@@ -100,11 +102,13 @@ class NewHadoopRDD[K, V](
100102
val split = theSplit.asInstanceOf[NewHadoopPartition]
101103
logInfo("Input split: " + split.serializableHadoopSplit)
102104
val conf = confBroadcast.value.value
103-
val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
105+
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
104106
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
105107
val format = inputFormatClass.newInstance
106-
if (format.isInstanceOf[Configurable]) {
107-
format.asInstanceOf[Configurable].setConf(conf)
108+
format match {
109+
case configurable: Configurable =>
110+
configurable.setConf(conf)
111+
case _ =>
108112
}
109113
val reader = format.createRecordReader(
110114
split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -150,3 +154,30 @@ class NewHadoopRDD[K, V](
150154
def getConf: Configuration = confBroadcast.value.value
151155
}
152156

157+
private[spark] class WholeTextFileRDD(
158+
sc : SparkContext,
159+
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
160+
keyClass: Class[String],
161+
valueClass: Class[String],
162+
@transient conf: Configuration,
163+
minSplits: Int)
164+
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
165+
166+
override def getPartitions: Array[Partition] = {
167+
val inputFormat = inputFormatClass.newInstance
168+
inputFormat match {
169+
case configurable: Configurable =>
170+
configurable.setConf(conf)
171+
case _ =>
172+
}
173+
val jobContext = newJobContext(conf, jobId)
174+
inputFormat.setMaxSplitSize(jobContext, minSplits)
175+
val rawSplits = inputFormat.getSplits(jobContext).toArray
176+
val result = new Array[Partition](rawSplits.size)
177+
for (i <- 0 until rawSplits.size) {
178+
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
179+
}
180+
result
181+
}
182+
}
183+

0 commit comments

Comments
 (0)