Skip to content

Commit d150431

Browse files
committed
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
2 parents cde6af9 + 8d85359 commit d150431

File tree

69 files changed

+1184
-468
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1184
-468
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
108108
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
109109
wrapRDD(rdd.sample(withReplacement, fraction, seed))
110110

111+
112+
/**
113+
* Randomly splits this RDD with the provided weights.
114+
*
115+
* @param weights weights for splits, will be normalized if they don't sum to 1
116+
*
117+
* @return split RDDs in an array
118+
*/
119+
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
120+
randomSplit(weights, Utils.random.nextLong)
121+
122+
/**
123+
* Randomly splits this RDD with the provided weights.
124+
*
125+
* @param weights weights for splits, will be normalized if they don't sum to 1
126+
* @param seed random seed
127+
*
128+
* @return split RDDs in an array
129+
*/
130+
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
131+
rdd.randomSplit(weights, seed).map(wrapRDD)
132+
111133
/**
112134
* Return the union of this RDD and another one. Any identical elements will appear multiple
113135
* times (use `.distinct()` to eliminate them).

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
737737
val outfmt = job.getOutputFormatClass
738738
val jobFormat = outfmt.newInstance
739739

740-
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
740+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
741+
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
741742
// FileOutputFormat ignores the filesystem parameter
742743
jobFormat.checkOutputSpecs(job)
743744
}
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
803804
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
804805
valueClass.getSimpleName + ")")
805806

806-
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
808+
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807809
// FileOutputFormat ignores the filesystem parameter
808810
val ignoredFs = FileSystem.get(conf)
809811
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{SystemClock, Clock, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -61,7 +61,8 @@ class DAGScheduler(
6161
listenerBus: LiveListenerBus,
6262
mapOutputTracker: MapOutputTrackerMaster,
6363
blockManagerMaster: BlockManagerMaster,
64-
env: SparkEnv)
64+
env: SparkEnv,
65+
clock: Clock = SystemClock)
6566
extends Logging {
6667

6768
import DAGScheduler._
@@ -781,7 +782,7 @@ class DAGScheduler(
781782
logDebug("New pending tasks: " + myPending)
782783
taskScheduler.submitTasks(
783784
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
784-
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
785+
stageToInfos(stage).submissionTime = Some(clock.getTime())
785786
} else {
786787
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
787788
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -807,11 +808,11 @@ class DAGScheduler(
807808

808809
def markStageAsFinished(stage: Stage) = {
809810
val serviceTime = stageToInfos(stage).submissionTime match {
810-
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
811+
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
811812
case _ => "Unknown"
812813
}
813814
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
814-
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
815+
stageToInfos(stage).completionTime = Some(clock.getTime())
815816
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
816817
runningStages -= stage
817818
}
@@ -1015,7 +1016,7 @@ class DAGScheduler(
10151016
return
10161017
}
10171018
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
1018-
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
1019+
stageToInfos(failedStage).completionTime = Some(clock.getTime())
10191020
for (resultStage <- dependentStages) {
10201021
val job = resultStageToJob(resultStage)
10211022
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ private[spark] class FileLogger(
6161
// Only defined if the file system scheme is not local
6262
private var hadoopDataStream: Option[FSDataOutputStream] = None
6363

64+
// The Hadoop APIs have changed over time, so we use reflection to figure out
65+
// the correct method to use to flush a hadoop data stream. See SPARK-1518
66+
// for details.
67+
private val hadoopFlushMethod = {
68+
val cls = classOf[FSDataOutputStream]
69+
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
70+
}
71+
6472
private var writer: Option[PrintWriter] = None
6573

6674
/**
@@ -149,13 +157,13 @@ private[spark] class FileLogger(
149157
/**
150158
* Flush the writer to disk manually.
151159
*
152-
* If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be
153-
* sync()'ed manually as it does not support flush(), which is invoked by when higher
154-
* level streams are flushed.
160+
* When using a Hadoop filesystem, we need to invoke the hflush or sync
161+
* method. In HDFS, hflush guarantees that the data gets to all the
162+
* DataNodes.
155163
*/
156164
def flush() {
157165
writer.foreach(_.flush())
158-
hadoopDataStream.foreach(_.sync())
166+
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
159167
}
160168

161169
/**

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232
// Input streams are derived both from the in-memory map and spilled maps on disk
232233
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
235+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
235236

236237
inputStreams.foreach { it =>
237238
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248
* Assume the given iterator is in sorted order.
248249
*/
249-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
250+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
250251
val kcPairs = new ArrayBuffer[(K, C)]
251252
if (it.hasNext) {
252253
var kc = it.next()
253254
kcPairs += kc
254255
val minHash = kc._1.hashCode()
255-
while (it.hasNext && kc._1.hashCode() == minHash) {
256+
while (it.hasNext && it.head._1.hashCode() == minHash) {
256257
kc = it.next()
257258
kcPairs += kc
258259
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326
*
326327
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328
*/
328-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
329+
private class StreamBuffer(
330+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
329331
extends Comparable[StreamBuffer] {
330332

331333
def isEmpty = pairs.length == 0

0 commit comments

Comments
 (0)