Skip to content

Commit d89adf8

Browse files
author
Sital Kedia
committed
[SPARK-14363] Fix executor OOM due to memory leak in the Sorter
1 parent 22014e6 commit d89adf8

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
215215
}
216216
}
217217

218-
inMemSorter.reset();
219-
220218
if (!isLastFile) { // i.e. this is a spill file
221219
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
222220
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
255253

256254
writeSortedFile(false);
257255
final long spillSize = freeMemory();
256+
inMemSorter.reset();
257+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
258+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
259+
// we might not be able to get memory for the pointer array.
258260
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
259261
return spillSize;
260262
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
5151
*/
5252
private int pos = 0;
5353

54+
private int initialSize;
55+
5456
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
5557
this.consumer = consumer;
5658
assert (initialSize > 0);
59+
this.initialSize = initialSize;
5760
this.array = consumer.allocateArray(initialSize);
5861
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
5962
}
@@ -70,6 +73,10 @@ public int numRecords() {
7073
}
7174

7275
public void reset() {
76+
if (consumer != null) {
77+
consumer.freeArray(array);
78+
this.array = consumer.allocateArray(initialSize);
79+
}
7380
pos = 0;
7481
}
7582

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
200200
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
201201
}
202202
spillWriter.close();
203-
204-
inMemSorter.reset();
205203
}
206204

207205
final long spillSize = freeMemory();
208206
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
209207
// pages will currently be counted as memory spilled even though that space isn't actually
210208
// written to disk. This also counts the space needed to store the sorter's pointer array.
209+
inMemSorter.reset();
210+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
211+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
212+
// we might not be able to get memory for the pointer array.
213+
211214
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
212215

213216
return spillSize;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
8484
*/
8585
private int pos = 0;
8686

87+
private long initialSize;
88+
8789
public UnsafeInMemorySorter(
8890
final MemoryConsumer consumer,
8991
final TaskMemoryManager memoryManager,
@@ -102,6 +104,7 @@ public UnsafeInMemorySorter(
102104
LongArray array) {
103105
this.consumer = consumer;
104106
this.memoryManager = memoryManager;
107+
this.initialSize = array.size();
105108
if (recordComparator != null) {
106109
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
107110
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
@@ -123,6 +126,10 @@ public void free() {
123126
}
124127

125128
public void reset() {
129+
if (consumer != null) {
130+
consumer.freeArray(array);
131+
this.array = consumer.allocateArray(initialSize);
132+
}
126133
pos = 0;
127134
}
128135

0 commit comments

Comments
 (0)