@@ -126,124 +126,117 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
126126 <br />
127127 }
128128
129- def generateStateOperators (
129+ def generateAggregatedStateOperators (
130130 query : StreamingQueryUIData ,
131131 minBatchTime : Long ,
132132 maxBatchTime : Long ,
133133 jsCollector : JsCollector
134- ): Array [ NodeBuffer ] = {
135- var opId = 0
134+ ): NodeBuffer = {
135+ // This is made sure on caller side but put it here to be defensive
136136 require(query.lastProgress != null )
137- query.lastProgress.stateOperators.map { _ =>
138- val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
139- p.stateOperators(opId).numRowsTotal.toDouble))
140- val maxNumRowsTotal = query.recentProgress.map(_.stateOperators(opId).numRowsTotal).max
141-
142- val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
143- p.stateOperators(opId).numRowsUpdated.toDouble))
144- val maxNumRowsUpdated = query.recentProgress.map(_.stateOperators(opId).numRowsUpdated).max
145-
146- val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
147- p.stateOperators(opId).memoryUsedBytes.toDouble))
148- val maxMemoryUsedBytes = query.recentProgress.map(_.stateOperators(opId).memoryUsedBytes).max
149-
150- val numRowsDroppedByWatermarkData = query.recentProgress
151- .map(p => (parseProgressTimestamp(p.timestamp),
152- p.stateOperators(opId).numRowsDroppedByWatermark.toDouble))
153- val maxNumRowsDroppedByWatermark = query.recentProgress
154- .map(_.stateOperators(opId).numRowsDroppedByWatermark).max
155-
156- val graphUIDataForNumberTotalRows =
157- new GraphUIData (
158- s " op $opId-num-total-rows-timeline " ,
159- s " op $opId-num-total-rows-histogram " ,
160- numRowsTotalData,
161- minBatchTime,
162- maxBatchTime,
163- 0 ,
164- maxNumRowsTotal,
165- " records" )
166- graphUIDataForNumberTotalRows.generateDataJs(jsCollector)
167-
168- val graphUIDataForNumberUpdatedRows =
169- new GraphUIData (
170- s " op $opId-num-updated-rows-timeline " ,
171- s " op $opId-num-updated-rows-histogram " ,
172- numRowsUpdatedData,
173- minBatchTime,
174- maxBatchTime,
175- 0 ,
176- maxNumRowsUpdated,
177- " records" )
178- graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)
179-
180- val graphUIDataForMemoryUsedBytes =
181- new GraphUIData (
182- s " op $opId-memory-used-bytes-timeline " ,
183- s " op $opId-memory-used-bytes-histogram " ,
184- memoryUsedBytesData,
185- minBatchTime,
186- maxBatchTime,
187- 0 ,
188- maxMemoryUsedBytes,
189- " bytes" )
190- graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)
191-
192- val graphUIDataForNumRowsDroppedByWatermark =
193- new GraphUIData (
194- s " op $opId-num-rows-dropped-by-watermark-timeline " ,
195- s " op $opId-num-rows-dropped-by-watermark-histogram " ,
196- numRowsDroppedByWatermarkData,
197- minBatchTime,
198- maxBatchTime,
199- 0 ,
200- maxNumRowsDroppedByWatermark,
201- " records" )
202- graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
203-
204- val result =
205- // scalastyle:off
206- <tr >
207- <td style =" vertical-align: middle;" >
208- <div style =" width: 160px;" >
209- <div ><strong >Operator # {s " $opId" } Number Of Total Rows {SparkUIUtils .tooltip(" Number of total rows." , " right" )}</strong ></div >
210- </div >
211- </td >
212- <td class ={s " op $opId-num-total-rows-timeline " }>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td >
213- <td class ={s " op $opId-num-total-rows-histogram " }>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td >
214- </tr >
215- <tr >
216- <td style =" vertical-align: middle;" >
217- <div style =" width: 160px;" >
218- <div ><strong >Operator # {s " $opId" } Number Of Updated Rows {SparkUIUtils .tooltip(" Number of updated rows." , " right" )}</strong ></div >
219- </div >
220- </td >
221- <td class ={s " op $opId-num-updated-rows-timeline " }>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td >
222- <td class ={s " op $opId-num-updated-rows-histogram " }>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td >
223- </tr >
224- <tr >
225- <td style =" vertical-align: middle;" >
226- <div style =" width: 160px;" >
227- <div ><strong >Operator # {s " $opId" } Memory Used In Bytes {SparkUIUtils .tooltip(" Memory USed In Bytes." , " right" )}</strong ></div >
228- </div >
229- </td >
230- <td class ={s " op $opId-memory-used-bytes-timeline " }>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td >
231- <td class ={s " op $opId-memory-used-bytes-histogram " }>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td >
232- </tr >
233- <tr >
234- <td style =" vertical-align: middle;" >
235- <div style =" width: 160px;" >
236- <div ><strong >Operator # {s " $opId" } Number Of Rows Dropped By Watermark {SparkUIUtils .tooltip(" Number Of Rows Dropped By Watermark." , " right" )}</strong ></div >
237- </div >
238- </td >
239- <td class ={s " op $opId-num-rows-dropped-by-watermark-timeline " }>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td >
240- <td class ={s " op $opId-num-rows-dropped-by-watermark-histogram " }>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td >
241- </tr >
242- // scalastyle:on
243-
244- opId += 1
245- result
246- }
137+ val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
138+ p.stateOperators.map(_.numRowsTotal).sum.toDouble))
139+ val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2
140+
141+ val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
142+ p.stateOperators.map(_.numRowsUpdated).sum.toDouble))
143+ val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2
144+
145+ val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
146+ p.stateOperators.map(_.memoryUsedBytes).sum.toDouble))
147+ val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2
148+
149+ val numRowsDroppedByWatermarkData = query.recentProgress
150+ .map(p => (parseProgressTimestamp(p.timestamp),
151+ p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble))
152+ val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2
153+
154+ val graphUIDataForNumberTotalRows =
155+ new GraphUIData (
156+ " aggregated-num-total-rows-timeline" ,
157+ " aggregated-num-total-rows-histogram" ,
158+ numRowsTotalData,
159+ minBatchTime,
160+ maxBatchTime,
161+ 0 ,
162+ maxNumRowsTotal,
163+ " records" )
164+ graphUIDataForNumberTotalRows.generateDataJs(jsCollector)
165+
166+ val graphUIDataForNumberUpdatedRows =
167+ new GraphUIData (
168+ " aggregated-num-updated-rows-timeline" ,
169+ " aggregated-num-updated-rows-histogram" ,
170+ numRowsUpdatedData,
171+ minBatchTime,
172+ maxBatchTime,
173+ 0 ,
174+ maxNumRowsUpdated,
175+ " records" )
176+ graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)
177+
178+ val graphUIDataForMemoryUsedBytes =
179+ new GraphUIData (
180+ " aggregated-memory-used-bytes-timeline" ,
181+ " aggregated-memory-used-bytes-histogram" ,
182+ memoryUsedBytesData,
183+ minBatchTime,
184+ maxBatchTime,
185+ 0 ,
186+ maxMemoryUsedBytes,
187+ " bytes" )
188+ graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)
189+
190+ val graphUIDataForNumRowsDroppedByWatermark =
191+ new GraphUIData (
192+ " aggregated-num-rows-dropped-by-watermark-timeline" ,
193+ " aggregated-num-rows-dropped-by-watermark-histogram" ,
194+ numRowsDroppedByWatermarkData,
195+ minBatchTime,
196+ maxBatchTime,
197+ 0 ,
198+ maxNumRowsDroppedByWatermark,
199+ " records" )
200+ graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
201+
202+ // scalastyle:off
203+ <tr >
204+ <td style =" vertical-align: middle;" >
205+ <div style =" width: 160px;" >
206+ <div ><strong >Aggregated Number Of Total Rows {SparkUIUtils .tooltip(" Number of total rows." , " right" )}</strong ></div >
207+ </div >
208+ </td >
209+ <td class ={" aggregated-num-total-rows-timeline" }>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td >
210+ <td class ={" aggregated-num-total-rows-histogram" }>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td >
211+ </tr >
212+ <tr >
213+ <td style =" vertical-align: middle;" >
214+ <div style =" width: 160px;" >
215+ <div ><strong >Aggregated Number Of Updated Rows {SparkUIUtils .tooltip(" Number of updated rows." , " right" )}</strong ></div >
216+ </div >
217+ </td >
218+ <td class ={" aggregated-num-updated-rows-timeline" }>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td >
219+ <td class ={" aggregated-num-updated-rows-histogram" }>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td >
220+ </tr >
221+ <tr >
222+ <td style =" vertical-align: middle;" >
223+ <div style =" width: 160px;" >
224+ <div ><strong >Aggregated Memory Used In Bytes {SparkUIUtils .tooltip(" Memory Used In Bytes." , " right" )}</strong ></div >
225+ </div >
226+ </td >
227+ <td class ={" aggregated-memory-used-bytes-timeline" }>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td >
228+ <td class ={" aggregated-memory-used-bytes-histogram" }>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td >
229+ </tr >
230+ <tr >
231+ <td style =" vertical-align: middle;" >
232+ <div style =" width: 160px;" >
233+ <div ><strong >Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils .tooltip(" Number Of Rows Dropped By Watermark." , " right" )}</strong ></div >
234+ </div >
235+ </td >
236+ <td class ={" aggregated-num-rows-dropped-by-watermark-timeline" }>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td >
237+ <td class ={" aggregated-num-rows-dropped-by-watermark-histogram" }>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td >
238+ </tr >
239+ // scalastyle:on
247240 }
248241
249242 def generateStatTable (query : StreamingQueryUIData ): Seq [Node ] = {
@@ -404,7 +397,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
404397 </td >
405398 <td class =" duration-area-stack" colspan =" 2" >{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td >
406399 </tr >
407- {generateStateOperators (query, minBatchTime, maxBatchTime, jsCollector)}
400+ {generateAggregatedStateOperators (query, minBatchTime, maxBatchTime, jsCollector)}
408401 </tbody >
409402 </table >
410403 } else {
0 commit comments