Skip to content

Commit 4adc856

Browse files
committed
Aggregate operators
1 parent ba23c1a commit 4adc856

File tree

1 file changed

+107
-114
lines changed

1 file changed

+107
-114
lines changed

sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala

Lines changed: 107 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -126,124 +126,117 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
126126
<br />
127127
}
128128

129-
def generateStateOperators(
129+
def generateAggregatedStateOperators(
130130
query: StreamingQueryUIData,
131131
minBatchTime: Long,
132132
maxBatchTime: Long,
133133
jsCollector: JsCollector
134-
): Array[NodeBuffer] = {
135-
var opId = 0
134+
): NodeBuffer = {
135+
// This is made sure on caller side but put it here to be defensive
136136
require(query.lastProgress != null)
137-
query.lastProgress.stateOperators.map { _ =>
138-
val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
139-
p.stateOperators(opId).numRowsTotal.toDouble))
140-
val maxNumRowsTotal = query.recentProgress.map(_.stateOperators(opId).numRowsTotal).max
141-
142-
val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
143-
p.stateOperators(opId).numRowsUpdated.toDouble))
144-
val maxNumRowsUpdated = query.recentProgress.map(_.stateOperators(opId).numRowsUpdated).max
145-
146-
val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
147-
p.stateOperators(opId).memoryUsedBytes.toDouble))
148-
val maxMemoryUsedBytes = query.recentProgress.map(_.stateOperators(opId).memoryUsedBytes).max
149-
150-
val numRowsDroppedByWatermarkData = query.recentProgress
151-
.map(p => (parseProgressTimestamp(p.timestamp),
152-
p.stateOperators(opId).numRowsDroppedByWatermark.toDouble))
153-
val maxNumRowsDroppedByWatermark = query.recentProgress
154-
.map(_.stateOperators(opId).numRowsDroppedByWatermark).max
155-
156-
val graphUIDataForNumberTotalRows =
157-
new GraphUIData(
158-
s"op$opId-num-total-rows-timeline",
159-
s"op$opId-num-total-rows-histogram",
160-
numRowsTotalData,
161-
minBatchTime,
162-
maxBatchTime,
163-
0,
164-
maxNumRowsTotal,
165-
"records")
166-
graphUIDataForNumberTotalRows.generateDataJs(jsCollector)
167-
168-
val graphUIDataForNumberUpdatedRows =
169-
new GraphUIData(
170-
s"op$opId-num-updated-rows-timeline",
171-
s"op$opId-num-updated-rows-histogram",
172-
numRowsUpdatedData,
173-
minBatchTime,
174-
maxBatchTime,
175-
0,
176-
maxNumRowsUpdated,
177-
"records")
178-
graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)
179-
180-
val graphUIDataForMemoryUsedBytes =
181-
new GraphUIData(
182-
s"op$opId-memory-used-bytes-timeline",
183-
s"op$opId-memory-used-bytes-histogram",
184-
memoryUsedBytesData,
185-
minBatchTime,
186-
maxBatchTime,
187-
0,
188-
maxMemoryUsedBytes,
189-
"bytes")
190-
graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)
191-
192-
val graphUIDataForNumRowsDroppedByWatermark =
193-
new GraphUIData(
194-
s"op$opId-num-rows-dropped-by-watermark-timeline",
195-
s"op$opId-num-rows-dropped-by-watermark-histogram",
196-
numRowsDroppedByWatermarkData,
197-
minBatchTime,
198-
maxBatchTime,
199-
0,
200-
maxNumRowsDroppedByWatermark,
201-
"records")
202-
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
203-
204-
val result =
205-
// scalastyle:off
206-
<tr>
207-
<td style="vertical-align: middle;">
208-
<div style="width: 160px;">
209-
<div><strong>Operator #{s"$opId"} Number Of Total Rows {SparkUIUtils.tooltip("Number of total rows.", "right")}</strong></div>
210-
</div>
211-
</td>
212-
<td class={s"op$opId-num-total-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
213-
<td class={s"op$opId-num-total-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
214-
</tr>
215-
<tr>
216-
<td style="vertical-align: middle;">
217-
<div style="width: 160px;">
218-
<div><strong>Operator #{s"$opId"} Number Of Updated Rows {SparkUIUtils.tooltip("Number of updated rows.", "right")}</strong></div>
219-
</div>
220-
</td>
221-
<td class={s"op$opId-num-updated-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
222-
<td class={s"op$opId-num-updated-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
223-
</tr>
224-
<tr>
225-
<td style="vertical-align: middle;">
226-
<div style="width: 160px;">
227-
<div><strong>Operator #{s"$opId"} Memory Used In Bytes {SparkUIUtils.tooltip("Memory USed In Bytes.", "right")}</strong></div>
228-
</div>
229-
</td>
230-
<td class={s"op$opId-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
231-
<td class={s"op$opId-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
232-
</tr>
233-
<tr>
234-
<td style="vertical-align: middle;">
235-
<div style="width: 160px;">
236-
<div><strong>Operator #{s"$opId"} Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Number Of Rows Dropped By Watermark.", "right")}</strong></div>
237-
</div>
238-
</td>
239-
<td class={s"op$opId-num-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
240-
<td class={s"op$opId-num-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
241-
</tr>
242-
// scalastyle:on
243-
244-
opId += 1
245-
result
246-
}
137+
val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
138+
p.stateOperators.map(_.numRowsTotal).sum.toDouble))
139+
val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2
140+
141+
val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
142+
p.stateOperators.map(_.numRowsUpdated).sum.toDouble))
143+
val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2
144+
145+
val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
146+
p.stateOperators.map(_.memoryUsedBytes).sum.toDouble))
147+
val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2
148+
149+
val numRowsDroppedByWatermarkData = query.recentProgress
150+
.map(p => (parseProgressTimestamp(p.timestamp),
151+
p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble))
152+
val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2
153+
154+
val graphUIDataForNumberTotalRows =
155+
new GraphUIData(
156+
"aggregated-num-total-rows-timeline",
157+
"aggregated-num-total-rows-histogram",
158+
numRowsTotalData,
159+
minBatchTime,
160+
maxBatchTime,
161+
0,
162+
maxNumRowsTotal,
163+
"records")
164+
graphUIDataForNumberTotalRows.generateDataJs(jsCollector)
165+
166+
val graphUIDataForNumberUpdatedRows =
167+
new GraphUIData(
168+
"aggregated-num-updated-rows-timeline",
169+
"aggregated-num-updated-rows-histogram",
170+
numRowsUpdatedData,
171+
minBatchTime,
172+
maxBatchTime,
173+
0,
174+
maxNumRowsUpdated,
175+
"records")
176+
graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)
177+
178+
val graphUIDataForMemoryUsedBytes =
179+
new GraphUIData(
180+
"aggregated-memory-used-bytes-timeline",
181+
"aggregated-memory-used-bytes-histogram",
182+
memoryUsedBytesData,
183+
minBatchTime,
184+
maxBatchTime,
185+
0,
186+
maxMemoryUsedBytes,
187+
"bytes")
188+
graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)
189+
190+
val graphUIDataForNumRowsDroppedByWatermark =
191+
new GraphUIData(
192+
"aggregated-num-rows-dropped-by-watermark-timeline",
193+
"aggregated-num-rows-dropped-by-watermark-histogram",
194+
numRowsDroppedByWatermarkData,
195+
minBatchTime,
196+
maxBatchTime,
197+
0,
198+
maxNumRowsDroppedByWatermark,
199+
"records")
200+
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
201+
202+
// scalastyle:off
203+
<tr>
204+
<td style="vertical-align: middle;">
205+
<div style="width: 160px;">
206+
<div><strong>Aggregated Number Of Total Rows {SparkUIUtils.tooltip("Number of total rows.", "right")}</strong></div>
207+
</div>
208+
</td>
209+
<td class={"aggregated-num-total-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
210+
<td class={"aggregated-num-total-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
211+
</tr>
212+
<tr>
213+
<td style="vertical-align: middle;">
214+
<div style="width: 160px;">
215+
<div><strong>Aggregated Number Of Updated Rows {SparkUIUtils.tooltip("Number of updated rows.", "right")}</strong></div>
216+
</div>
217+
</td>
218+
<td class={"aggregated-num-updated-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
219+
<td class={"aggregated-num-updated-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
220+
</tr>
221+
<tr>
222+
<td style="vertical-align: middle;">
223+
<div style="width: 160px;">
224+
<div><strong>Aggregated Memory Used In Bytes {SparkUIUtils.tooltip("Memory Used In Bytes.", "right")}</strong></div>
225+
</div>
226+
</td>
227+
<td class={"aggregated-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
228+
<td class={"aggregated-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
229+
</tr>
230+
<tr>
231+
<td style="vertical-align: middle;">
232+
<div style="width: 160px;">
233+
<div><strong>Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Number Of Rows Dropped By Watermark.", "right")}</strong></div>
234+
</div>
235+
</td>
236+
<td class={"aggregated-num-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
237+
<td class={"aggregated-num-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
238+
</tr>
239+
// scalastyle:on
247240
}
248241

249242
def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
@@ -404,7 +397,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
404397
</td>
405398
<td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
406399
</tr>
407-
{generateStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
400+
{generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
408401
</tbody>
409402
</table>
410403
} else {

0 commit comments

Comments
 (0)