Skip to content

Commit 271227c

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into temp-functions
2 parents 2f53330 + 2f98ee6 commit 271227c

File tree

73 files changed

+5786
-618
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+5786
-618
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
238238
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
239239
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
240240
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
241+
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
241242
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
242243
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
243244
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.annotations.VisibleForTesting;
2525
import com.google.common.collect.Lists;
2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.yarn.api.records.ContainerId;
2829
import org.apache.hadoop.yarn.server.api.*;
2930
import org.slf4j.Logger;
@@ -118,7 +119,7 @@ protected void serviceInit(Configuration conf) {
118119
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
119120
// when it comes back
120121
registeredExecutorFile =
121-
findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
122+
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
122123

123124
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
124125
// If authentication is enabled, set up the shuffle server to use a
@@ -191,12 +192,12 @@ public void stopContainer(ContainerTerminationContext context) {
191192

192193
private File findRegisteredExecutorFile(String[] localDirs) {
193194
for (String dir: localDirs) {
194-
File f = new File(dir, "registeredExecutors.ldb");
195+
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
195196
if (f.exists()) {
196197
return f;
197198
}
198199
}
199-
return new File(localDirs[0], "registeredExecutors.ldb");
200+
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
200201
}
201202

202203
/**

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 73 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@
5656
* Bytes 4 to 8: len(k)
5757
* Bytes 8 to 8 + len(k): key data
5858
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
59+
* Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
5960
*
6061
* This means that the first four bytes store the entire record (key + value) length. This format
61-
* is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
62+
* is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
6263
* so we can pass records from this map directly into the sorter to sort records in place.
6364
*/
6465
public final class BytesToBytesMap extends MemoryConsumer {
@@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
132133
/**
133134
* Number of keys defined in the map.
134135
*/
135-
private int numElements;
136+
private int numKeys;
137+
138+
/**
139+
* Number of values defined in the map. A key could have multiple values.
140+
*/
141+
private int numValues;
136142

137143
/**
138144
* The map will be expanded once the number of keys exceeds this threshold.
@@ -223,7 +229,12 @@ public BytesToBytesMap(
223229
/**
224230
* Returns the number of keys defined in the map.
225231
*/
226-
public int numElements() { return numElements; }
232+
public int numKeys() { return numKeys; }
233+
234+
/**
235+
* Returns the number of values defined in the map. A key could have multiple values.
236+
*/
237+
public int numValues() { return numValues; }
227238

228239
public final class MapIterator implements Iterator<Location> {
229240

@@ -311,7 +322,8 @@ public Location next() {
311322
if (currentPage != null) {
312323
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
313324
loc.with(currentPage, offsetInPage);
314-
offsetInPage += 4 + totalLength;
325+
// [total size] [key size] [key] [value] [pointer to next]
326+
offsetInPage += 4 + totalLength + 8;
315327
recordsInPage --;
316328
return loc;
317329
} else {
@@ -361,7 +373,7 @@ public long spill(long numBytes) throws IOException {
361373
while (numRecords > 0) {
362374
int length = Platform.getInt(base, offset);
363375
writer.write(base, offset + 4, length, 0);
364-
offset += 4 + length;
376+
offset += 4 + length + 8;
365377
numRecords--;
366378
}
367379
writer.close();
@@ -395,7 +407,7 @@ public void remove() {
395407
* `lookup()`, the behavior of the returned iterator is undefined.
396408
*/
397409
public MapIterator iterator() {
398-
return new MapIterator(numElements, loc, false);
410+
return new MapIterator(numValues, loc, false);
399411
}
400412

401413
/**
@@ -409,7 +421,7 @@ public MapIterator iterator() {
409421
* `lookup()`, the behavior of the returned iterator is undefined.
410422
*/
411423
public MapIterator destructiveIterator() {
412-
return new MapIterator(numElements, loc, true);
424+
return new MapIterator(numValues, loc, true);
413425
}
414426

415427
/**
@@ -559,6 +571,20 @@ private Location with(Object base, long offset, int length) {
559571
return this;
560572
}
561573

574+
/**
575+
* Find the next pair that has the same key as current one.
576+
*/
577+
public boolean nextValue() {
578+
assert isDefined;
579+
long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
580+
if (nextAddr == 0) {
581+
return false;
582+
} else {
583+
updateAddressesAndSizes(nextAddr);
584+
return true;
585+
}
586+
}
587+
562588
/**
563589
* Returns the memory page that contains the current record.
564590
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
@@ -625,10 +651,9 @@ public int getValueLength() {
625651
}
626652

627653
/**
628-
* Store a new key and value. This method may only be called once for a given key; if you want
629-
* to update the value associated with a key, then you can directly manipulate the bytes stored
630-
* at the value address. The return value indicates whether the put succeeded or whether it
631-
* failed because additional memory could not be acquired.
654+
* Append a new value for the key. This method could be called multiple times for a given key.
655+
* The return value indicates whether the put succeeded or whether it failed because additional
656+
* memory could not be acquired.
632657
* <p>
633658
* It is only valid to call this method immediately after calling `lookup()` using the same key.
634659
* </p>
@@ -637,15 +662,15 @@ public int getValueLength() {
637662
* </p>
638663
* <p>
639664
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
640-
* will return information on the data stored by this `putNewKey` call.
665+
* will return information on the data stored by this `append` call.
641666
* </p>
642667
* <p>
643668
* As an example usage, here's the proper way to store a new key:
644669
* </p>
645670
* <pre>
646671
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
647672
* if (!loc.isDefined()) {
648-
* if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
673+
* if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
649674
* // handle failure to grow map (by spilling, for example)
650675
* }
651676
* }
@@ -657,26 +682,23 @@ public int getValueLength() {
657682
* @return true if the put() was successful and false if the put() failed because memory could
658683
* not be acquired.
659684
*/
660-
public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
661-
Object valueBase, long valueOffset, int valueLength) {
662-
assert (!isDefined) : "Can only set value once for a key";
663-
assert (keyLength % 8 == 0);
664-
assert (valueLength % 8 == 0);
665-
assert(longArray != null);
666-
685+
public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
686+
assert (klen % 8 == 0);
687+
assert (vlen % 8 == 0);
688+
assert (longArray != null);
667689

668-
if (numElements == MAX_CAPACITY
690+
if (numKeys == MAX_CAPACITY
669691
// The map could be reused from last spill (because of no enough memory to grow),
670692
// then we don't try to grow again if hit the `growthThreshold`.
671-
|| !canGrowArray && numElements > growthThreshold) {
693+
|| !canGrowArray && numKeys > growthThreshold) {
672694
return false;
673695
}
674696

675697
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
676698
// the key address instead of storing the absolute address of the value, the key and value
677699
// must be stored in the same memory page.
678-
// (8 byte key length) (key) (value)
679-
final long recordLength = 8 + keyLength + valueLength;
700+
// (8 byte key length) (key) (value) (8 byte pointer to next value)
701+
final long recordLength = 8 + klen + vlen + 8;
680702
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
681703
if (!acquireNewPage(recordLength + 4L)) {
682704
return false;
@@ -687,30 +709,40 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
687709
final Object base = currentPage.getBaseObject();
688710
long offset = currentPage.getBaseOffset() + pageCursor;
689711
final long recordOffset = offset;
690-
Platform.putInt(base, offset, keyLength + valueLength + 4);
691-
Platform.putInt(base, offset + 4, keyLength);
712+
Platform.putInt(base, offset, klen + vlen + 4);
713+
Platform.putInt(base, offset + 4, klen);
692714
offset += 8;
693-
Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
694-
offset += keyLength;
695-
Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
715+
Platform.copyMemory(kbase, koff, base, offset, klen);
716+
offset += klen;
717+
Platform.copyMemory(vbase, voff, base, offset, vlen);
718+
offset += vlen;
719+
Platform.putLong(base, offset, 0);
696720

697721
// --- Update bookkeeping data structures ----------------------------------------------------
698722
offset = currentPage.getBaseOffset();
699723
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
700724
pageCursor += recordLength;
701-
numElements++;
702725
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
703726
currentPage, recordOffset);
704-
longArray.set(pos * 2, storedKeyAddress);
705-
longArray.set(pos * 2 + 1, keyHashcode);
706-
updateAddressesAndSizes(storedKeyAddress);
707-
isDefined = true;
727+
numValues++;
728+
if (isDefined) {
729+
// put this pair at the end of chain
730+
while (nextValue()) { /* do nothing */ }
731+
Platform.putLong(baseObject, valueOffset + valueLength, storedKeyAddress);
732+
nextValue(); // point to new added value
733+
} else {
734+
numKeys++;
735+
longArray.set(pos * 2, storedKeyAddress);
736+
longArray.set(pos * 2 + 1, keyHashcode);
737+
updateAddressesAndSizes(storedKeyAddress);
738+
isDefined = true;
708739

709-
if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) {
710-
try {
711-
growAndRehash();
712-
} catch (OutOfMemoryError oom) {
713-
canGrowArray = false;
740+
if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
741+
try {
742+
growAndRehash();
743+
} catch (OutOfMemoryError oom) {
744+
canGrowArray = false;
745+
}
714746
}
715747
}
716748
return true;
@@ -866,7 +898,8 @@ public LongArray getArray() {
866898
* Reset this map to initialized state.
867899
*/
868900
public void reset() {
869-
numElements = 0;
901+
numKeys = 0;
902+
numValues = 0;
870903
longArray.zeroOut();
871904

872905
while (dataPages.size() > 0) {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,15 @@ private[spark] class CoarseGrainedExecutorBackend(
113113

114114
case Shutdown =>
115115
stopping.set(true)
116-
executor.stop()
117-
stop()
118-
rpcEnv.shutdown()
116+
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
117+
override def run(): Unit = {
118+
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
119+
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
120+
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
121+
// Therefore, we put this line in a new thread.
122+
executor.stop()
123+
}
124+
}.start()
119125
}
120126

121127
override def onDisconnected(remoteAddress: RpcAddress): Unit = {

0 commit comments

Comments
 (0)