|
20 | 20 | import javax.annotation.concurrent.GuardedBy; |
21 | 21 | import java.io.IOException; |
22 | 22 | import java.util.Arrays; |
| 23 | +import java.util.ArrayList; |
23 | 24 | import java.util.BitSet; |
24 | 25 | import java.util.HashSet; |
| 26 | +import java.util.List; |
| 27 | +import java.util.Map; |
| 28 | +import java.util.TreeMap; |
25 | 29 |
|
26 | 30 | import com.google.common.annotations.VisibleForTesting; |
27 | 31 | import org.slf4j.Logger; |
@@ -144,23 +148,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { |
144 | 148 | // spilling, avoid to have too many spilled files. |
145 | 149 | if (got < required) { |
146 | 150 | // Call spill() on other consumers to release memory |
| 151 | + // Sort the consumers according their memory usage. So we avoid spilling the same consumer |
| 152 | + // which is just spilled in last few times and re-spilling on it will produce many small |
| 153 | + // spill files. |
| 154 | + TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>(); |
147 | 155 | for (MemoryConsumer c: consumers) { |
148 | 156 | if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { |
149 | | - try { |
150 | | - long released = c.spill(required - got, consumer); |
151 | | - if (released > 0) { |
152 | | - logger.debug("Task {} released {} from {} for {}", taskAttemptId, |
153 | | - Utils.bytesToString(released), c, consumer); |
154 | | - got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); |
155 | | - if (got >= required) { |
156 | | - break; |
157 | | - } |
| 157 | + long key = c.getUsed(); |
| 158 | + List<MemoryConsumer> list = sortedConsumers.get(key); |
| 159 | + if (list == null) { |
| 160 | + list = new ArrayList<>(1); |
| 161 | + sortedConsumers.put(key, list); |
| 162 | + } |
| 163 | + list.add(c); |
| 164 | + } |
| 165 | + } |
| 166 | + while (!sortedConsumers.isEmpty()) { |
| 167 | + // Get the consumer using the least memory more than the remaining required memory. |
| 168 | + Map.Entry<Long, List<MemoryConsumer>> currentEntry = |
| 169 | + sortedConsumers.ceilingEntry(required - got); |
| 170 | + // No consumer has used memory more than the remaining required memory. |
| 171 | + // Get the consumer of largest used memory. |
| 172 | + if (currentEntry == null) { |
| 173 | + currentEntry = sortedConsumers.lastEntry(); |
| 174 | + } |
| 175 | + List<MemoryConsumer> cList = currentEntry.getValue(); |
| 176 | + MemoryConsumer c = cList.remove(cList.size() - 1); |
| 177 | + if (cList.isEmpty()) { |
| 178 | + sortedConsumers.remove(currentEntry.getKey()); |
| 179 | + } |
| 180 | + try { |
| 181 | + long released = c.spill(required - got, consumer); |
| 182 | + if (released > 0) { |
| 183 | + logger.debug("Task {} released {} from {} for {}", taskAttemptId, |
| 184 | + Utils.bytesToString(released), c, consumer); |
| 185 | + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); |
| 186 | + if (got >= required) { |
| 187 | + break; |
158 | 188 | } |
159 | | - } catch (IOException e) { |
160 | | - logger.error("error while calling spill() on " + c, e); |
161 | | - throw new OutOfMemoryError("error while calling spill() on " + c + " : " |
162 | | - + e.getMessage()); |
163 | 189 | } |
| 190 | + } catch (IOException e) { |
| 191 | + logger.error("error while calling spill() on " + c, e); |
| 192 | + throw new OutOfMemoryError("error while calling spill() on " + c + " : " |
| 193 | + + e.getMessage()); |
164 | 194 | } |
165 | 195 | } |
166 | 196 | } |
|
0 commit comments