Skip to content

Commit 3df7124

Browse files
committed
[SPARK-39987][K8S] Support PEAK_JVM_(ON|OFF)HEAP_MEMORY executor rolling policy
### What changes were proposed in this pull request? This PR aims to support two new executor rolling policies. - `PEAK_JVM_ONHEAP_MEMORY` policy chooses an executor with the biggest peak JVM on-heap memory. - `PEAK_JVM_OFFHEAP_MEMORY` policy chooses an executor with the biggest peak JVM off-heap memory. ### Why are the changes needed? Although peak memory is a kind of historic value, these two new policies add a capability to maintain the memory usage of Spark jobs minimally as much as possible. ### Does this PR introduce _any_ user-facing change? Yes, but this is a new feature. ### How was this patch tested? Pass the CIs. Closes apache#37418 from dongjoon-hyun/SPARK-39987. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b481ed3 commit 3df7124

File tree

3 files changed

+128
-15
lines changed

3 files changed

+128
-15
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,14 @@ private[spark] object Config extends Logging {
164164

165165
object ExecutorRollPolicy extends Enumeration {
166166
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS,
167-
OUTLIER, OUTLIER_NO_FALLBACK = Value
167+
PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, OUTLIER, OUTLIER_NO_FALLBACK = Value
168168
}
169169

170170
val EXECUTOR_ROLL_POLICY =
171171
ConfigBuilder("spark.kubernetes.executor.rollPolicy")
172172
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME, " +
173-
"TOTAL_DURATION, FAILED_TASKS, and OUTLIER (default). " +
173+
"TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, PEAK_JVM_ONHEAP_MEMORY, " +
174+
"PEAK_JVM_OFFHEAP_MEMORY, OUTLIER (default), and OUTLIER_NO_FALLBACK. " +
174175
"When executor roll happens, Spark uses this policy to choose " +
175176
"an executor and decommission it. The built-in policies are based on executor summary." +
176177
"ID policy chooses an executor with the smallest executor ID. " +
@@ -179,6 +180,9 @@ private[spark] object Config extends Logging {
179180
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
180181
"AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
181182
"FAILED_TASKS policy chooses an executor with the most number of failed tasks. " +
183+
"PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " +
184+
"memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " +
185+
"off-heap memory. " +
182186
"OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
183187
"at least two standard deviation from the mean in average task time, " +
184188
"total task time, total task GC time, and the number of failed tasks if exists. " +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
2525
import org.apache.spark.SparkContext
2626
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
2727
import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
28+
import org.apache.spark.executor.ExecutorMetrics
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
3031
import org.apache.spark.scheduler.ExecutorDecommissionInfo
@@ -47,6 +48,8 @@ class ExecutorRollPlugin extends SparkPlugin {
4748
}
4849

4950
class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
51+
lazy val EMPTY_METRICS = new ExecutorMetrics(Array.emptyLongArray)
52+
5053
private var sparkContext: SparkContext = _
5154

5255
private val periodicService: ScheduledExecutorService =
@@ -99,6 +102,9 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
99102

100103
override def shutdown(): Unit = periodicService.shutdown()
101104

105+
private def getPeakMetrics(summary: v1.ExecutorSummary, name: String): Long =
106+
summary.peakMemoryMetrics.getOrElse(EMPTY_METRICS).getMetricValue(name)
107+
102108
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
103109
: Option[String] = {
104110
val listWithoutDriver = list
@@ -118,6 +124,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
118124
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
119125
case ExecutorRollPolicy.FAILED_TASKS =>
120126
listWithoutDriver.sortBy(_.failedTasks).reverse
127+
case ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY =>
128+
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse
129+
case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY =>
130+
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse
121131
case ExecutorRollPolicy.OUTLIER =>
122132
// If there is no outlier we fallback to TOTAL_DURATION policy.
123133
outliersFromMultipleDimensions(listWithoutDriver) ++
@@ -131,14 +141,17 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
131141
/**
132142
* We build multiple outlier lists and concat in the following importance order to find
133143
* outliers in various perspective:
134-
* AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS
144+
* AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS >
145+
* PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY
135146
* Since we will choose only first item, the duplication is okay.
136147
*/
137148
private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) =
138149
outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration / e.totalTasks) ++
139150
outliers(listWithoutDriver, e => e.totalDuration) ++
140151
outliers(listWithoutDriver, e => e.totalGCTime) ++
141-
outliers(listWithoutDriver, e => e.failedTasks)
152+
outliers(listWithoutDriver, e => e.failedTasks) ++
153+
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
154+
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory"))
142155

143156
/**
144157
* Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala

Lines changed: 107 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.scalatest.PrivateMethodTester
2222

2323
import org.apache.spark.SparkFunSuite
2424
import org.apache.spark.deploy.k8s.Config.ExecutorRollPolicy
25+
import org.apache.spark.executor.ExecutorMetrics
2526
import org.apache.spark.status.api.v1.ExecutorSummary
2627

2728
class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
@@ -30,20 +31,23 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
3031

3132
private val _choose = PrivateMethod[Option[String]](Symbol("choose"))
3233

34+
val metrics = Some(new ExecutorMetrics(
35+
Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 1024L)))
36+
3337
val driverSummary = new ExecutorSummary("driver", "host:port", true, 1,
3438
10, 10, 1, 1, 1,
3539
0, 0, 1, 100,
3640
1, 100, 100,
3741
10, false, 20, new Date(1639300000000L),
38-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
42+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
3943
false, Set())
4044

4145
val execWithSmallestID = new ExecutorSummary("1", "host:port", true, 1,
4246
10, 10, 1, 1, 1,
4347
0, 0, 1, 100,
4448
20, 100, 100,
4549
10, false, 20, new Date(1639300001000L),
46-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
50+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
4751
false, Set())
4852

4953
// The smallest addTime
@@ -52,7 +56,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
5256
0, 0, 1, 100,
5357
20, 100, 100,
5458
10, false, 20, new Date(1639300000000L),
55-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
59+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
5660
false, Set())
5761

5862
// The biggest totalGCTime
@@ -61,7 +65,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
6165
0, 0, 1, 100,
6266
40, 100, 100,
6367
10, false, 20, new Date(1639300002000L),
64-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
68+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
6569
false, Set())
6670

6771
// The biggest totalDuration
@@ -70,7 +74,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
7074
0, 0, 4, 400,
7175
20, 100, 100,
7276
10, false, 20, new Date(1639300003000L),
73-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
77+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
7478
false, Set())
7579

7680
// The biggest failedTasks
@@ -79,7 +83,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
7983
5, 0, 1, 100,
8084
20, 100, 100,
8185
10, false, 20, new Date(1639300003000L),
82-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
86+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
8387
false, Set())
8488

8589
// The biggest average duration (= totalDuration / totalTask)
@@ -88,7 +92,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
8892
0, 0, 2, 300,
8993
20, 100, 100,
9094
10, false, 20, new Date(1639300003000L),
91-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
95+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
9296
false, Set())
9397

9498
// The executor with no tasks
@@ -97,7 +101,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
97101
0, 0, 0, 0,
98102
0, 0, 0,
99103
0, false, 0, new Date(1639300001000L),
100-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
104+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
101105
false, Set())
102106

103107
// This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
@@ -106,20 +110,41 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
106110
4, 0, 2, 280,
107111
30, 100, 100,
108112
10, false, 20, new Date(1639300001000L),
109-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
113+
Option.empty, Option.empty, Map(), Option.empty, Set(),
114+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1200L))),
115+
Map(), Map(), 1,
110116
false, Set())
111117

112118
val execWithTwoDigitID = new ExecutorSummary("10", "host:port", true, 1,
113119
10, 10, 1, 1, 1,
114120
4, 0, 2, 280,
115121
30, 100, 100,
116122
10, false, 20, new Date(1639300001000L),
117-
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
123+
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
118124
false, Set())
119125

126+
val execWithBiggestPeakJVMOnHeapMemory = new ExecutorSummary("11", "host:port", true, 1,
127+
10, 10, 1, 1, 1,
128+
4, 0, 2, 280,
129+
30, 100, 100,
130+
10, false, 20, new Date(1639300001000L),
131+
Option.empty, Option.empty, Map(), Option.empty, Set(),
132+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1201L, "JVMOffHeapMemory" -> 1200L))),
133+
Map(), Map(), 1, false, Set())
134+
135+
val execWithBiggestPeakJVMOffHeapMemory = new ExecutorSummary("12", "host:port", true, 1,
136+
10, 10, 1, 1, 1,
137+
4, 0, 2, 280,
138+
30, 100, 100,
139+
10, false, 20, new Date(1639300001000L),
140+
Option.empty, Option.empty, Map(), Option.empty, Set(),
141+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1201L))),
142+
Map(), Map(), 1, false, Set())
143+
120144
val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
121145
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
122-
execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID)
146+
execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
147+
execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory)
123148

124149
override def beforeEach(): Unit = {
125150
super.beforeEach()
@@ -179,6 +204,16 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
179204
assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)).contains("6"))
180205
}
181206

207+
test("Policy: PEAK_JVM_ONHEAP_MEMORY") {
208+
assert(plugin.invokePrivate(
209+
_choose(list, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)).contains("11"))
210+
}
211+
212+
test("Policy: PEAK_JVM_OFFHEAP_MEMORY") {
213+
assert(plugin.invokePrivate(
214+
_choose(list, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)).contains("12"))
215+
}
216+
182217
test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
183218
assert(
184219
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) ==
@@ -224,6 +259,36 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
224259
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
225260
}
226261

262+
test("Policy: OUTLIER - Detect a peak JVM on-heap memory outlier") {
263+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
264+
0, 0, 1, 0, 0,
265+
3, 0, 1, 100,
266+
1000, 0, 0,
267+
0, false, 0, new Date(1639300001000L),
268+
Option.empty, Option.empty, Map(), Option.empty, Set(),
269+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
270+
Map(), Map(), 1,
271+
false, Set())
272+
assert(
273+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
274+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
275+
}
276+
277+
test("Policy: OUTLIER - Detect a peak JVM off-heap memory outlier") {
278+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
279+
0, 0, 1, 0, 0,
280+
3, 0, 1, 100,
281+
1000, 0, 0,
282+
0, false, 0, new Date(1639300001000L),
283+
Option.empty, Option.empty, Map(), Option.empty, Set(),
284+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
285+
Map(), Map(), 1,
286+
false, Set())
287+
assert(
288+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
289+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
290+
}
291+
227292
test("Policy: OUTLIER_NO_FALLBACK - Return None if there are no outliers") {
228293
assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)).isEmpty)
229294
}
@@ -266,4 +331,35 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
266331
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME)) ==
267332
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
268333
}
334+
335+
test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM on-heap memory outlier") {
336+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
337+
0, 0, 1, 0, 0,
338+
3, 0, 1, 100,
339+
0, 0, 0,
340+
0, false, 0, new Date(1639300001000L),
341+
Option.empty, Option.empty, Map(), Option.empty, Set(),
342+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
343+
Map(), Map(), 1,
344+
false, Set())
345+
val x = plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME))
346+
assert(
347+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
348+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
349+
}
350+
351+
test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM off-heap memory outlier") {
352+
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
353+
0, 0, 1, 0, 0,
354+
3, 0, 1, 100,
355+
0, 0, 0,
356+
0, false, 0, new Date(1639300001000L),
357+
Option.empty, Option.empty, Map(), Option.empty, Set(),
358+
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
359+
Map(), Map(), 1,
360+
false, Set())
361+
assert(
362+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
363+
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
364+
}
269365
}

0 commit comments

Comments
 (0)