Skip to content

Commit 8e105e1

Browse files
committed
Merge branch 'master' of github.com:apache/spark into standalone-cluster
Conflicts: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
2 parents b890949 + a45d548 commit 8e105e1

File tree

90 files changed

+1795
-755
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+1795
-755
lines changed

assembly/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<deb.pkg.name>spark</deb.pkg.name>
4040
<deb.install.path>/usr/share/spark</deb.install.path>
4141
<deb.user>root</deb.user>
42+
<deb.bin.filemode>744</deb.bin.filemode>
4243
</properties>
4344

4445
<dependencies>
@@ -276,7 +277,7 @@
276277
<user>${deb.user}</user>
277278
<group>${deb.user}</group>
278279
<prefix>${deb.install.path}/bin</prefix>
279-
<filemode>744</filemode>
280+
<filemode>${deb.bin.filemode}</filemode>
280281
</mapper>
281282
</data>
282283
<data>

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ object Bagel extends Logging {
7272
var verts = vertices
7373
var msgs = messages
7474
var noActivity = false
75+
var lastRDD: RDD[(K, (V, Array[M]))] = null
7576
do {
7677
logInfo("Starting superstep " + superstep + ".")
7778
val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
8384
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
8485
val (processed, numMsgs, numActiveVerts) =
8586
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
87+
if (lastRDD != null) {
88+
lastRDD.unpersist(false)
89+
}
90+
lastRDD = processed
8691

8792
val timeTaken = System.currentTimeMillis - startTime
8893
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,9 @@ object SparkSubmit {
267267
sysProps.getOrElseUpdate(k, v)
268268
}
269269

270+
// Spark properties included on command line take precedence
271+
sysProps ++= args.sparkProperties
272+
270273
(childArgs, childClasspath, sysProps, childMainClass)
271274
}
272275

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5555
var verbose: Boolean = false
5656
var isPython: Boolean = false
5757
var pyFiles: String = null
58+
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5859

5960
parseOpts(args.toList)
6061
loadDefaults()
@@ -177,6 +178,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
177178
| executorCores $executorCores
178179
| totalExecutorCores $totalExecutorCores
179180
| propertiesFile $propertiesFile
181+
| extraSparkProperties $sparkProperties
180182
| driverMemory $driverMemory
181183
| driverCores $driverCores
182184
| driverExtraClassPath $driverExtraClassPath
@@ -290,6 +292,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
290292
jars = Utils.resolveURIs(value)
291293
parse(tail)
292294

295+
case ("--conf" | "-c") :: value :: tail =>
296+
value.split("=", 2).toSeq match {
297+
case Seq(k, v) => sparkProperties(k) = v
298+
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
299+
}
300+
parse(tail)
301+
293302
case ("--help" | "-h") :: tail =>
294303
printUsageAndExit(0)
295304

@@ -349,6 +358,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
349358
| on the PYTHONPATH for Python apps.
350359
| --files FILES Comma-separated list of files to be placed in the working
351360
| directory of each executor.
361+
|
362+
| --conf PROP=VALUE Arbitrary Spark configuration property.
352363
| --properties-file FILE Path to a file from which to load extra properties. If not
353364
| specified, this will look for conf/spark-defaults.conf.
354365
|

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
169169
val ui: SparkUI = if (renderUI) {
170170
val conf = this.conf.clone()
171171
val appSecManager = new SecurityManager(conf)
172-
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
172+
new SparkUI(conf, appSecManager, replayBus, appId,
173+
HistoryServer.UI_PATH_PREFIX + s"/$appId")
173174
// Do not call ui.bind() to avoid creating a new server for each application
174175
} else {
175176
null

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
7575
"Last Updated")
7676

7777
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
78-
val uiAddress = "/history/" + info.id
78+
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
7979
val startTime = UIUtils.formatDate(info.startTime)
8080
val endTime = UIUtils.formatDate(info.endTime)
8181
val duration = UIUtils.formatDuration(info.endTime - info.startTime)

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class HistoryServer(
114114
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
115115

116116
val contextHandler = new ServletContextHandler
117-
contextHandler.setContextPath("/history")
117+
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
118118
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
119119
attachHandler(contextHandler)
120120
}
@@ -172,6 +172,8 @@ class HistoryServer(
172172
object HistoryServer extends Logging {
173173
private val conf = new SparkConf
174174

175+
val UI_PATH_PREFIX = "/history"
176+
175177
def main(argStrings: Array[String]) {
176178
SignalLogger.register(log)
177179
initSecurity()

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
3535
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3636
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
3737
import org.apache.spark.deploy.DeployMessages._
38+
import org.apache.spark.deploy.history.HistoryServer
3839
import org.apache.spark.deploy.master.DriverState.DriverState
3940
import org.apache.spark.deploy.master.MasterMessages._
4041
import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -664,9 +665,10 @@ private[spark] class Master(
664665
*/
665666
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
666667
val appName = app.desc.name
668+
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
667669
val eventLogDir = app.desc.eventLogDir.getOrElse {
668670
// Event logging is not enabled for this application
669-
app.desc.appUiUrl = "/history/not-found"
671+
app.desc.appUiUrl = notFoundBasePath
670672
return false
671673
}
672674
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
@@ -681,13 +683,14 @@ private[spark] class Master(
681683
logWarning(msg)
682684
msg += " Did you specify the correct logging directory?"
683685
msg = URLEncoder.encode(msg, "UTF-8")
684-
app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
686+
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
685687
return false
686688
}
687689

688690
try {
689691
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
690-
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
692+
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
693+
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
691694
replayBus.replay()
692695
appIdToUI(app.id) = ui
693696
webUi.attachSparkUI(ui)
@@ -702,7 +705,7 @@ private[spark] class Master(
702705
var msg = s"Exception in replaying log for application $appName!"
703706
logError(msg, e)
704707
msg = URLEncoder.encode(msg, "UTF-8")
705-
app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title"
708+
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
706709
false
707710
}
708711
}

core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,20 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
3939
*
4040
* @param prev RDD to be sampled
4141
* @param sampler a random sampler
42+
* @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD
4243
* @param seed random seed
4344
* @tparam T input RDD item type
4445
* @tparam U sampled RDD item type
4546
*/
4647
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
4748
prev: RDD[T],
4849
sampler: RandomSampler[T, U],
50+
@transient preservesPartitioning: Boolean,
4951
@transient seed: Long = Utils.random.nextLong)
5052
extends RDD[U](prev) {
5153

54+
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
55+
5256
override def getPartitions: Array[Partition] = {
5357
val random = new Random(seed)
5458
firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,9 @@ abstract class RDD[T: ClassTag](
356356
seed: Long = Utils.random.nextLong): RDD[T] = {
357357
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
358358
if (withReplacement) {
359-
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
359+
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
360360
} else {
361-
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), seed)
361+
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
362362
}
363363
}
364364

@@ -374,7 +374,7 @@ abstract class RDD[T: ClassTag](
374374
val sum = weights.sum
375375
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
376376
normalizedCumWeights.sliding(2).map { x =>
377-
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), seed)
377+
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed)
378378
}.toArray
379379
}
380380

@@ -586,6 +586,9 @@ abstract class RDD[T: ClassTag](
586586

587587
/**
588588
* Return a new RDD by applying a function to each partition of this RDD.
589+
*
590+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
591+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
589592
*/
590593
def mapPartitions[U: ClassTag](
591594
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
@@ -596,6 +599,9 @@ abstract class RDD[T: ClassTag](
596599
/**
597600
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
598601
* of the original partition.
602+
*
603+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
604+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
599605
*/
600606
def mapPartitionsWithIndex[U: ClassTag](
601607
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
@@ -607,6 +613,9 @@ abstract class RDD[T: ClassTag](
607613
* :: DeveloperApi ::
608614
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
609615
* mapPartitions that also passes the TaskContext into the closure.
616+
*
617+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
618+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
610619
*/
611620
@DeveloperApi
612621
def mapPartitionsWithContext[U: ClassTag](
@@ -689,7 +698,7 @@ abstract class RDD[T: ClassTag](
689698
* a map on the other).
690699
*/
691700
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
692-
zipPartitions(other, true) { (thisIter, otherIter) =>
701+
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
693702
new Iterator[(T, U)] {
694703
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
695704
case (true, true) => true

0 commit comments

Comments
 (0)