Skip to content

Commit f235227

Browse files
author
Narine Kokhlikyan
committed
merge with master
2 parents 8db1d08 + 579fbcf commit f235227

File tree

441 files changed

+6247
-3464
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

441 files changed

+6247
-3464
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
.idea/
1818
.idea_modules/
1919
.project
20+
.pydevproject
2021
.scala_dependencies
2122
.settings
2223
/lib/
@@ -78,3 +79,6 @@ spark-warehouse/
7879
.RData
7980
.RHistory
8081
.Rhistory
82+
*.Rproj
83+
*.Rproj.*
84+

R/pkg/R/DataFrame.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ setOldClass("structType")
3535
#' @slot env An R environment that stores bookkeeping states of the SparkDataFrame
3636
#' @slot sdf A Java object reference to the backing Scala DataFrame
3737
#' @seealso \link{createDataFrame}, \link{read.json}, \link{table}
38-
#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkdataframe}
38+
#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes}
3939
#' @export
4040
#' @examples
4141
#'\dontrun{

R/pkg/R/sparkR.R

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,6 @@ connExists <- function(env) {
2828
})
2929
}
3030

31-
#' @rdname sparkR.session.stop
32-
#' @name sparkR.stop
33-
#' @export
34-
#' @note sparkR.stop since 1.4.0
35-
sparkR.stop <- function() {
36-
sparkR.session.stop()
37-
}
38-
3931
#' Stop the Spark Session and Spark Context
4032
#'
4133
#' Stop the Spark Session and Spark Context.
@@ -90,6 +82,14 @@ sparkR.session.stop <- function() {
9082
clearJobjs()
9183
}
9284

85+
#' @rdname sparkR.session.stop
86+
#' @name sparkR.stop
87+
#' @export
88+
#' @note sparkR.stop since 1.4.0
89+
sparkR.stop <- function() {
90+
sparkR.session.stop()
91+
}
92+
9393
#' (Deprecated) Initialize a new Spark Context
9494
#'
9595
#' This function initializes a new SparkContext.

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
8888

8989
/** Array of file writers, one for each partition */
9090
private DiskBlockObjectWriter[] partitionWriters;
91+
private FileSegment[] partitionWriterSegments;
9192
@Nullable private MapStatus mapStatus;
9293
private long[] partitionLengths;
9394

@@ -131,6 +132,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
131132
final SerializerInstance serInstance = serializer.newInstance();
132133
final long openStartTime = System.nanoTime();
133134
partitionWriters = new DiskBlockObjectWriter[numPartitions];
135+
partitionWriterSegments = new FileSegment[numPartitions];
134136
for (int i = 0; i < numPartitions; i++) {
135137
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
136138
blockManager.diskBlockManager().createTempShuffleBlock();
@@ -150,8 +152,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
150152
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
151153
}
152154

153-
for (DiskBlockObjectWriter writer : partitionWriters) {
154-
writer.commitAndClose();
155+
for (int i = 0; i < numPartitions; i++) {
156+
final DiskBlockObjectWriter writer = partitionWriters[i];
157+
partitionWriterSegments[i] = writer.commitAndGet();
158+
writer.close();
155159
}
156160

157161
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
@@ -184,7 +188,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
184188
boolean threwException = true;
185189
try {
186190
for (int i = 0; i < numPartitions; i++) {
187-
final File file = partitionWriters[i].fileSegment().file();
191+
final File file = partitionWriterSegments[i].file();
188192
if (file.exists()) {
189193
final FileInputStream in = new FileInputStream(file);
190194
boolean copyThrewException = true;

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.spark.serializer.SerializerInstance;
3838
import org.apache.spark.storage.BlockManager;
3939
import org.apache.spark.storage.DiskBlockObjectWriter;
40+
import org.apache.spark.storage.FileSegment;
4041
import org.apache.spark.storage.TempShuffleBlockId;
4142
import org.apache.spark.unsafe.Platform;
4243
import org.apache.spark.unsafe.array.LongArray;
@@ -150,10 +151,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
150151
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
151152
inMemSorter.getSortedIterator();
152153

153-
// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
154-
// after SPARK-5581 is fixed.
155-
DiskBlockObjectWriter writer;
156-
157154
// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
158155
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
159156
// data through a byte array. This array does not need to be large enough to hold a single
@@ -175,7 +172,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
175172
// around this, we pass a dummy no-op serializer.
176173
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
177174

178-
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
175+
final DiskBlockObjectWriter writer =
176+
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
179177

180178
int currentPartition = -1;
181179
while (sortedRecords.hasNext()) {
@@ -185,12 +183,10 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
185183
if (partition != currentPartition) {
186184
// Switch to the new partition
187185
if (currentPartition != -1) {
188-
writer.commitAndClose();
189-
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
186+
final FileSegment fileSegment = writer.commitAndGet();
187+
spillInfo.partitionLengths[currentPartition] = fileSegment.length();
190188
}
191189
currentPartition = partition;
192-
writer =
193-
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
194190
}
195191

196192
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
@@ -209,15 +205,14 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
209205
writer.recordWritten();
210206
}
211207

212-
if (writer != null) {
213-
writer.commitAndClose();
214-
// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
215-
// then the file might be empty. Note that it might be better to avoid calling
216-
// writeSortedFile() in that case.
217-
if (currentPartition != -1) {
218-
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
219-
spills.add(spillInfo);
220-
}
208+
final FileSegment committedSegment = writer.commitAndGet();
209+
writer.close();
210+
// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
211+
// then the file might be empty. Note that it might be better to avoid calling
212+
// writeSortedFile() in that case.
213+
if (currentPartition != -1) {
214+
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
215+
spills.add(spillInfo);
221216
}
222217

223218
if (!isLastFile) { // i.e. this is a spill file

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ public void write(
136136
}
137137

138138
public void close() throws IOException {
139-
writer.commitAndClose();
139+
writer.commitAndGet();
140+
writer.close();
140141
writer = null;
141142
writeBuffer = null;
142143
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private[spark] class MapOutputTrackerMasterEndpoint(
4646
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
4747
extends RpcEndpoint with Logging {
4848

49+
logDebug("init") // force eager creation of logger
50+
4951
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
5052
case GetMapOutputStatuses(shuffleId: Int) =>
5153
val hostPort = context.senderAddress.hostPort

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
4747
*
4848
* @param loadDefaults whether to also load values from Java system properties
4949
*/
50-
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
50+
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
5151

5252
import SparkConf._
5353

@@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
370370
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
371371
}
372372

373+
/** Get all parameters that start with `prefix` */
374+
def getAllWithPrefix(prefix: String): Array[(String, String)] = {
375+
getAll.filter { case (k, v) => k.startsWith(prefix) }
376+
.map { case (k, v) => (k.substring(prefix.length), v) }
377+
}
378+
379+
373380
/** Get a parameter as an integer, falling back to a default if not set */
374381
def getInt(key: String, defaultValue: Int): Int = {
375382
getOption(key).map(_.toInt).getOrElse(defaultValue)
@@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
392399

393400
/** Get all executor environment variables set on this SparkConf */
394401
def getExecutorEnv: Seq[(String, String)] = {
395-
val prefix = "spark.executorEnv."
396-
getAll.filter{case (k, v) => k.startsWith(prefix)}
397-
.map{case (k, v) => (k.substring(prefix.length), v)}
402+
getAllWithPrefix("spark.executorEnv.")
398403
}
399404

400405
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
249249
def isStopped: Boolean = stopped.get()
250250

251251
// An asynchronous listener bus for Spark events
252-
private[spark] val listenerBus = new LiveListenerBus
252+
private[spark] val listenerBus = new LiveListenerBus(this)
253253

254254
// This function allows components created by SparkEnv to be mocked in unit tests:
255255
private[spark] def createSparkEnv(
@@ -556,6 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
556556
// Make sure the context is stopped if the user forgets about it. This avoids leaving
557557
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
558558
// is killed, though.
559+
logDebug("Adding shutdown hook") // force eager creation of logger
559560
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
560561
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
561562
logInfo("Invoking stop() from shutdown hook")
@@ -2147,7 +2148,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
21472148
}
21482149
}
21492150

2150-
listenerBus.start(this)
2151+
listenerBus.start()
21512152
_listenerBusStarted = true
21522153
}
21532154

core/src/main/scala/org/apache/spark/TaskState.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
4141
}
4242

4343
def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
44-
case MesosTaskState.TASK_STAGING => LAUNCHING
45-
case MesosTaskState.TASK_STARTING => LAUNCHING
46-
case MesosTaskState.TASK_RUNNING => RUNNING
44+
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
45+
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
4746
case MesosTaskState.TASK_FINISHED => FINISHED
4847
case MesosTaskState.TASK_FAILED => FAILED
4948
case MesosTaskState.TASK_KILLED => KILLED
50-
case MesosTaskState.TASK_LOST => LOST
51-
case MesosTaskState.TASK_ERROR => LOST
49+
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
5250
}
5351
}

0 commit comments

Comments
 (0)