Skip to content

Commit d9c66d7

Browse files
committed
Merge branch 'apache-master' into adaptive-2
2 parents a18be55 + 5bb62b8 commit d9c66d7

File tree

170 files changed

+1785
-944
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+1785
-944
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
7676
private long pageCursor = -1;
7777
private long peakMemoryUsedBytes = 0;
7878
private long totalSpillBytes = 0L;
79+
private long totalSortTimeNanos = 0L;
7980
private volatile SpillableIterator readingIterator = null;
8081

8182
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -247,6 +248,17 @@ public long getPeakMemoryUsedBytes() {
247248
return peakMemoryUsedBytes;
248249
}
249250

251+
/**
252+
* @return the total amount of time spent sorting data (in-memory only).
253+
*/
254+
public long getSortTimeNanos() {
255+
UnsafeInMemorySorter sorter = inMemSorter;
256+
if (sorter != null) {
257+
return sorter.getSortTimeNanos();
258+
}
259+
return totalSortTimeNanos;
260+
}
261+
250262
/**
251263
* Return the total number of bytes that has been spilled into disk so far.
252264
*/
@@ -505,6 +517,7 @@ public long spill() throws IOException {
505517
// in-memory sorter will not be used after spilling
506518
assert(inMemSorter != null);
507519
released += inMemSorter.getMemoryUsage();
520+
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
508521
inMemSorter.free();
509522
inMemSorter = null;
510523
taskContext.taskMetrics().incMemoryBytesSpilled(released);

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
9797

9898
private long initialSize;
9999

100+
private long totalSortTimeNanos = 0L;
101+
100102
public UnsafeInMemorySorter(
101103
final MemoryConsumer consumer,
102104
final TaskMemoryManager memoryManager,
@@ -160,6 +162,13 @@ public int numRecords() {
160162
return pos / 2;
161163
}
162164

165+
/**
166+
* @return the total amount of time spent sorting data (in-memory only).
167+
*/
168+
public long getSortTimeNanos() {
169+
return totalSortTimeNanos;
170+
}
171+
163172
public long getMemoryUsage() {
164173
return array.size() * 8;
165174
}
@@ -265,6 +274,7 @@ public void loadNext() {
265274
*/
266275
public SortedIterator getSortedIterator() {
267276
int offset = 0;
277+
long start = System.nanoTime();
268278
if (sorter != null) {
269279
if (this.radixSortSupport != null) {
270280
// TODO(ekl) we should handle NULL values before radix sort for efficiency, since they
@@ -275,6 +285,7 @@ public SortedIterator getSortedIterator() {
275285
sorter.sort(array, 0, pos / 2, sortComparator);
276286
}
277287
}
288+
totalSortTimeNanos += System.nanoTime() - start;
278289
return new SortedIterator(pos / 2, offset);
279290
}
280291
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,9 +955,9 @@ private[spark] object SparkSubmitUtils {
955955
// Add scala exclusion rule
956956
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
957957

958-
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
958+
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and
959959
// other spark-streaming utility components. Underscore is there to differentiate between
960-
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
960+
// spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x
961961
val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
962962
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
963963

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,20 @@ private[spark] object TaskMetrics extends Logging {
291291

292292
private[spark] class BlockStatusesAccumulator
293293
extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
294-
private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
294+
private var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
295295

296296
override def isZero(): Boolean = _seq.isEmpty
297297

298298
override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
299299

300+
override def copy(): BlockStatusesAccumulator = {
301+
val newAcc = new BlockStatusesAccumulator
302+
newAcc._seq = _seq.clone()
303+
newAcc
304+
}
305+
306+
override def reset(): Unit = _seq.clear()
307+
300308
override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
301309

302310
override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])

core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,20 @@ private[memory] class StorageMemoryPool(
116116
}
117117

118118
/**
119-
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
120-
* of bytes removed from the pool's capacity.
119+
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
120+
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
121+
*
122+
* @return number of bytes to be removed from the pool's capacity.
121123
*/
122-
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
123-
// First, shrink the pool by reclaiming free memory:
124+
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
124125
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
125-
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
126126
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
127127
if (remainingSpaceToFree > 0) {
128128
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
129129
val spaceFreedByEviction =
130130
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
131131
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
132132
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
133-
decrementPoolSize(spaceFreedByEviction)
134133
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
135134
} else {
136135
spaceFreedByReleasingUnusedMemory

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
113113
storagePool.poolSize - storageRegionSize)
114114
if (memoryReclaimableFromStorage > 0) {
115115
// Only reclaim as much space as is necessary and available:
116-
val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
116+
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
117117
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
118-
executionPool.incrementPoolSize(spaceReclaimed)
118+
storagePool.decrementPoolSize(spaceToReclaim)
119+
executionPool.incrementPoolSize(spaceToReclaim)
119120
}
120121
}
121122
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
289289
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
290290
listenerBus.post(
291291
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
292-
case None => logInfo(s"Asked to remove non-existent executor $executorId")
292+
case None =>
293+
// SPARK-15262: If an executor is still alive even after the scheduler has removed
294+
// its metadata, we may receive a heartbeat from that executor and tell its block
295+
// manager to reregister itself. If that happens, the block manager master will know
296+
// about the executor, but the scheduler will not. Therefore, we should remove the
297+
// executor from the block manager when we hit this case.
298+
scheduler.sc.env.blockManager.master.removeExecutor(executorId)
299+
logInfo(s"Asked to remove non-existent executor $executorId")
293300
}
294301
}
295302

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,22 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
112112
* Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy
113113
* must return true.
114114
*/
115-
def copyAndReset(): AccumulatorV2[IN, OUT]
115+
def copyAndReset(): AccumulatorV2[IN, OUT] = {
116+
val copyAcc = copy()
117+
copyAcc.reset()
118+
copyAcc
119+
}
120+
121+
/**
122+
* Creates a new copy of this accumulator.
123+
*/
124+
def copy(): AccumulatorV2[IN, OUT]
125+
126+
/**
127+
* Resets this accumulator, which is zero value. i.e. call `isZero` must
128+
* return true.
129+
*/
130+
def reset(): Unit
116131

117132
/**
118133
* Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator.
@@ -137,10 +152,10 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
137152
throw new UnsupportedOperationException(
138153
"Accumulator must be registered before send to executor")
139154
}
140-
val copy = copyAndReset()
141-
assert(copy.isZero, "copyAndReset must return a zero value copy")
142-
copy.metadata = metadata
143-
copy
155+
val copyAcc = copyAndReset()
156+
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
157+
copyAcc.metadata = metadata
158+
copyAcc
144159
} else {
145160
this
146161
}
@@ -249,16 +264,26 @@ private[spark] object AccumulatorContext {
249264
* @since 2.0.0
250265
*/
251266
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
252-
private[this] var _sum = 0L
253-
private[this] var _count = 0L
267+
private var _sum = 0L
268+
private var _count = 0L
254269

255270
/**
256271
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
257272
* @since 2.0.0
258273
*/
259274
override def isZero: Boolean = _sum == 0L && _count == 0
260275

261-
override def copyAndReset(): LongAccumulator = new LongAccumulator
276+
override def copy(): LongAccumulator = {
277+
val newAcc = new LongAccumulator
278+
newAcc._count = this._count
279+
newAcc._sum = this._sum
280+
newAcc
281+
}
282+
283+
override def reset(): Unit = {
284+
_sum = 0L
285+
_count = 0L
286+
}
262287

263288
/**
264289
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
@@ -318,12 +343,22 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
318343
* @since 2.0.0
319344
*/
320345
class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
321-
private[this] var _sum = 0.0
322-
private[this] var _count = 0L
346+
private var _sum = 0.0
347+
private var _count = 0L
323348

324349
override def isZero: Boolean = _sum == 0.0 && _count == 0
325350

326-
override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
351+
override def copy(): DoubleAccumulator = {
352+
val newAcc = new DoubleAccumulator
353+
newAcc._count = this._count
354+
newAcc._sum = this._sum
355+
newAcc
356+
}
357+
358+
override def reset(): Unit = {
359+
_sum = 0.0
360+
_count = 0L
361+
}
327362

328363
/**
329364
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
@@ -377,12 +412,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
377412

378413

379414
class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
380-
private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
415+
private val _list: java.util.List[T] = new java.util.ArrayList[T]
381416

382417
override def isZero: Boolean = _list.isEmpty
383418

384419
override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
385420

421+
override def copy(): ListAccumulator[T] = {
422+
val newAcc = new ListAccumulator[T]
423+
newAcc._list.addAll(_list)
424+
newAcc
425+
}
426+
427+
override def reset(): Unit = _list.clear()
428+
386429
override def add(v: T): Unit = _list.add(v)
387430

388431
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
@@ -407,12 +450,16 @@ class LegacyAccumulatorWrapper[R, T](
407450

408451
override def isZero: Boolean = _value == param.zero(initialValue)
409452

410-
override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
453+
override def copy(): LegacyAccumulatorWrapper[R, T] = {
411454
val acc = new LegacyAccumulatorWrapper(initialValue, param)
412-
acc._value = param.zero(initialValue)
455+
acc._value = _value
413456
acc
414457
}
415458

459+
override def reset(): Unit = {
460+
_value = param.zero(initialValue)
461+
}
462+
416463
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
417464

418465
override def merge(other: AccumulatorV2[T, R]): Unit = other match {

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.spark.unsafe.Platform;
5050
import org.apache.spark.util.Utils;
5151

52+
import static org.hamcrest.Matchers.greaterThan;
5253
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5354
import static org.junit.Assert.*;
5455
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -225,6 +226,25 @@ public void testSortingEmptyArrays() throws Exception {
225226
assertSpillFilesWereCleanedUp();
226227
}
227228

229+
@Test
230+
public void testSortTimeMetric() throws Exception {
231+
final UnsafeExternalSorter sorter = newSorter();
232+
long prevSortTime = sorter.getSortTimeNanos();
233+
assertEquals(prevSortTime, 0);
234+
235+
sorter.insertRecord(null, 0, 0, 0);
236+
sorter.spill();
237+
assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
238+
prevSortTime = sorter.getSortTimeNanos();
239+
240+
sorter.spill(); // no sort needed
241+
assertEquals(sorter.getSortTimeNanos(), prevSortTime);
242+
243+
sorter.insertRecord(null, 0, 0, 0);
244+
UnsafeSorterIterator iter = sorter.getSortedIterator();
245+
assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
246+
}
247+
228248
@Test
229249
public void spillingOccursInResponseToMemoryPressure() throws Exception {
230250
final UnsafeExternalSorter sorter = newSorter();

core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
7878
ms
7979
}
8080

81+
/**
82+
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
83+
* stubbed to always throw [[RuntimeException]].
84+
*/
85+
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
86+
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
87+
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
88+
override def answer(invocation: InvocationOnMock): Long = {
89+
throw new RuntimeException("bad memory store!")
90+
}
91+
})
92+
mm.setMemoryStore(ms)
93+
ms
94+
}
95+
8196
/**
8297
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
8398
*

0 commit comments

Comments
 (0)