@@ -22,6 +22,7 @@ import org.scalatest.PrivateMethodTester
2222
2323import org .apache .spark .SparkFunSuite
2424import org .apache .spark .deploy .k8s .Config .ExecutorRollPolicy
25+ import org .apache .spark .executor .ExecutorMetrics
2526import org .apache .spark .status .api .v1 .ExecutorSummary
2627
2728class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
@@ -30,20 +31,23 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
3031
3132 private val _choose = PrivateMethod [Option [String ]](Symbol (" choose" ))
3233
34+ val metrics = Some (new ExecutorMetrics (
35+ Map (" JVMHeapMemory" -> 1024L , " JVMOffHeapMemory" -> 1024L )))
36+
3337 val driverSummary = new ExecutorSummary (" driver" , " host:port" , true , 1 ,
3438 10 , 10 , 1 , 1 , 1 ,
3539 0 , 0 , 1 , 100 ,
3640 1 , 100 , 100 ,
3741 10 , false , 20 , new Date (1639300000000L ),
38- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
42+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
3943 false , Set ())
4044
4145 val execWithSmallestID = new ExecutorSummary (" 1" , " host:port" , true , 1 ,
4246 10 , 10 , 1 , 1 , 1 ,
4347 0 , 0 , 1 , 100 ,
4448 20 , 100 , 100 ,
4549 10 , false , 20 , new Date (1639300001000L ),
46- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
50+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
4751 false , Set ())
4852
4953 // The smallest addTime
@@ -52,7 +56,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
5256 0 , 0 , 1 , 100 ,
5357 20 , 100 , 100 ,
5458 10 , false , 20 , new Date (1639300000000L ),
55- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
59+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
5660 false , Set ())
5761
5862 // The biggest totalGCTime
@@ -61,7 +65,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
6165 0 , 0 , 1 , 100 ,
6266 40 , 100 , 100 ,
6367 10 , false , 20 , new Date (1639300002000L ),
64- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
68+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
6569 false , Set ())
6670
6771 // The biggest totalDuration
@@ -70,7 +74,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
7074 0 , 0 , 4 , 400 ,
7175 20 , 100 , 100 ,
7276 10 , false , 20 , new Date (1639300003000L ),
73- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
77+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
7478 false , Set ())
7579
7680 // The biggest failedTasks
@@ -79,7 +83,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
7983 5 , 0 , 1 , 100 ,
8084 20 , 100 , 100 ,
8185 10 , false , 20 , new Date (1639300003000L ),
82- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
86+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
8387 false , Set ())
8488
8589 // The biggest average duration (= totalDuration / totalTask)
@@ -88,7 +92,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
8892 0 , 0 , 2 , 300 ,
8993 20 , 100 , 100 ,
9094 10 , false , 20 , new Date (1639300003000L ),
91- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
95+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
9296 false , Set ())
9397
9498 // The executor with no tasks
@@ -97,7 +101,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
97101 0 , 0 , 0 , 0 ,
98102 0 , 0 , 0 ,
99103 0 , false , 0 , new Date (1639300001000L ),
100- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
104+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
101105 false , Set ())
102106
103107 // This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
@@ -106,20 +110,41 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
106110 4 , 0 , 2 , 280 ,
107111 30 , 100 , 100 ,
108112 10 , false , 20 , new Date (1639300001000L ),
109- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty, Map (), Map (), 1 ,
113+ Option .empty, Option .empty, Map (), Option .empty, Set (),
114+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 1200L , " JVMOffHeapMemory" -> 1200L ))),
115+ Map (), Map (), 1 ,
110116 false , Set ())
111117
112118 val execWithTwoDigitID = new ExecutorSummary (" 10" , " host:port" , true , 1 ,
113119 10 , 10 , 1 , 1 , 1 ,
114120 4 , 0 , 2 , 280 ,
115121 30 , 100 , 100 ,
116122 10 , false , 20 , new Date (1639300001000L ),
117- Option .empty, Option .empty, Map (), Option .empty, Set (), Option .empty , Map (), Map (), 1 ,
123+ Option .empty, Option .empty, Map (), Option .empty, Set (), metrics , Map (), Map (), 1 ,
118124 false , Set ())
119125
126+ val execWithBiggestPeakJVMOnHeapMemory = new ExecutorSummary (" 11" , " host:port" , true , 1 ,
127+ 10 , 10 , 1 , 1 , 1 ,
128+ 4 , 0 , 2 , 280 ,
129+ 30 , 100 , 100 ,
130+ 10 , false , 20 , new Date (1639300001000L ),
131+ Option .empty, Option .empty, Map (), Option .empty, Set (),
132+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 1201L , " JVMOffHeapMemory" -> 1200L ))),
133+ Map (), Map (), 1 , false , Set ())
134+
135+ val execWithBiggestPeakJVMOffHeapMemory = new ExecutorSummary (" 12" , " host:port" , true , 1 ,
136+ 10 , 10 , 1 , 1 , 1 ,
137+ 4 , 0 , 2 , 280 ,
138+ 30 , 100 , 100 ,
139+ 10 , false , 20 , new Date (1639300001000L ),
140+ Option .empty, Option .empty, Map (), Option .empty, Set (),
141+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 1200L , " JVMOffHeapMemory" -> 1201L ))),
142+ Map (), Map (), 1 , false , Set ())
143+
120144 val list = Seq (driverSummary, execWithSmallestID, execWithSmallestAddTime,
121145 execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
122- execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID)
146+ execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
147+ execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory)
123148
124149 override def beforeEach (): Unit = {
125150 super .beforeEach()
@@ -179,6 +204,16 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
179204 assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy .AVERAGE_DURATION )).contains(" 6" ))
180205 }
181206
207+ test(" Policy: PEAK_JVM_ONHEAP_MEMORY" ) {
208+ assert(plugin.invokePrivate(
209+ _choose(list, ExecutorRollPolicy .PEAK_JVM_ONHEAP_MEMORY )).contains(" 11" ))
210+ }
211+
212+ test(" Policy: PEAK_JVM_OFFHEAP_MEMORY" ) {
213+ assert(plugin.invokePrivate(
214+ _choose(list, ExecutorRollPolicy .PEAK_JVM_OFFHEAP_MEMORY )).contains(" 12" ))
215+ }
216+
182217 test(" Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier" ) {
183218 assert(
184219 plugin.invokePrivate(_choose(list, ExecutorRollPolicy .TOTAL_DURATION )) ==
@@ -224,6 +259,36 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
224259 plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER )))
225260 }
226261
262+ test(" Policy: OUTLIER - Detect a peak JVM on-heap memory outlier" ) {
263+ val outlier = new ExecutorSummary (" 9999" , " host:port" , true , 1 ,
264+ 0 , 0 , 1 , 0 , 0 ,
265+ 3 , 0 , 1 , 100 ,
266+ 1000 , 0 , 0 ,
267+ 0 , false , 0 , new Date (1639300001000L ),
268+ Option .empty, Option .empty, Map (), Option .empty, Set (),
269+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 2048L , " JVMOffHeapMemory" -> 1200L ))),
270+ Map (), Map (), 1 ,
271+ false , Set ())
272+ assert(
273+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .PEAK_JVM_ONHEAP_MEMORY )) ==
274+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER )))
275+ }
276+
277+ test(" Policy: OUTLIER - Detect a peak JVM off-heap memory outlier" ) {
278+ val outlier = new ExecutorSummary (" 9999" , " host:port" , true , 1 ,
279+ 0 , 0 , 1 , 0 , 0 ,
280+ 3 , 0 , 1 , 100 ,
281+ 1000 , 0 , 0 ,
282+ 0 , false , 0 , new Date (1639300001000L ),
283+ Option .empty, Option .empty, Map (), Option .empty, Set (),
284+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 1024L , " JVMOffHeapMemory" -> 2048L ))),
285+ Map (), Map (), 1 ,
286+ false , Set ())
287+ assert(
288+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .PEAK_JVM_OFFHEAP_MEMORY )) ==
289+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER )))
290+ }
291+
227292 test(" Policy: OUTLIER_NO_FALLBACK - Return None if there are no outliers" ) {
228293 assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy .OUTLIER_NO_FALLBACK )).isEmpty)
229294 }
@@ -266,4 +331,35 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
266331 plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .TOTAL_GC_TIME )) ==
267332 plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER_NO_FALLBACK )))
268333 }
334+
335+ test(" Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM on-heap memory outlier" ) {
336+ val outlier = new ExecutorSummary (" 9999" , " host:port" , true , 1 ,
337+ 0 , 0 , 1 , 0 , 0 ,
338+ 3 , 0 , 1 , 100 ,
339+ 0 , 0 , 0 ,
340+ 0 , false , 0 , new Date (1639300001000L ),
341+ Option .empty, Option .empty, Map (), Option .empty, Set (),
342+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 2048L , " JVMOffHeapMemory" -> 1200L ))),
343+ Map (), Map (), 1 ,
344+ false , Set ())
345+ val x = plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .TOTAL_GC_TIME ))
346+ assert(
347+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .PEAK_JVM_ONHEAP_MEMORY )) ==
348+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER_NO_FALLBACK )))
349+ }
350+
351+ test(" Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM off-heap memory outlier" ) {
352+ val outlier = new ExecutorSummary (" 9999" , " host:port" , true , 1 ,
353+ 0 , 0 , 1 , 0 , 0 ,
354+ 3 , 0 , 1 , 100 ,
355+ 0 , 0 , 0 ,
356+ 0 , false , 0 , new Date (1639300001000L ),
357+ Option .empty, Option .empty, Map (), Option .empty, Set (),
358+ Some (new ExecutorMetrics (Map (" JVMHeapMemory" -> 1024L , " JVMOffHeapMemory" -> 2048L ))),
359+ Map (), Map (), 1 ,
360+ false , Set ())
361+ assert(
362+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .PEAK_JVM_OFFHEAP_MEMORY )) ==
363+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy .OUTLIER_NO_FALLBACK )))
364+ }
269365}
0 commit comments