Skip to content

Commit cd20288

Browse files
authored
Support DoFn metrics in portable Samza Runner (#25068)
1 parent 4dad3c6 commit cd20288

4 files changed

Lines changed: 368 additions & 6 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
* Adding override of allowed TLS algorithms (Java), now maintaining the disabled/legacy algorithms
6969
present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java versions). This is accompanied
7070
by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11.
71+
* Add UDF metrics support for Samza portable mode.
7172

7273
## Breaking Changes
7374

runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.samza.runtime;
1919

20+
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.List;
2223
import java.util.Locale;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.ThreadLocalRandom;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
31+
import java.util.stream.Collectors;
3032
import org.apache.beam.model.pipeline.v1.RunnerApi;
3133
import org.apache.beam.runners.core.DoFnRunner;
3234
import org.apache.beam.runners.core.DoFnRunners;
@@ -39,7 +41,7 @@
3941
import org.apache.beam.runners.core.TimerInternals;
4042
import org.apache.beam.runners.core.construction.Timer;
4143
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
42-
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
44+
import org.apache.beam.runners.core.construction.graph.PipelineNode;
4345
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
4446
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
4547
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -246,7 +248,8 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
246248
idToTupleTagMap,
247249
bundledEventsBag,
248250
stateRequestHandler,
249-
samzaExecutionContext);
251+
samzaExecutionContext,
252+
executableStage.getTransforms());
250253
return pipelineOptions.getEnableMetrics()
251254
? DoFnRunnerWithMetrics.wrap(
252255
underlyingRunner, executionContext.getMetricsContainer(), transformFullName)
@@ -270,7 +273,8 @@ static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT
270273
private final StateRequestHandler stateRequestHandler;
271274
private final SamzaExecutionContext samzaExecutionContext;
272275
private long startBundleTime;
273-
private final String metricName;
276+
private final String stepName;
277+
private final Collection<PipelineNode.PTransformNode> pTransformNodes;
274278

275279
private SdkHarnessDoFnRunner(
276280
SamzaPipelineOptions pipelineOptions,
@@ -282,7 +286,8 @@ private SdkHarnessDoFnRunner(
282286
Map<String, TupleTag<?>> idToTupleTagMap,
283287
BagState<WindowedValue<InT>> bundledEventsBag,
284288
StateRequestHandler stateRequestHandler,
285-
SamzaExecutionContext samzaExecutionContext) {
289+
SamzaExecutionContext samzaExecutionContext,
290+
Collection<PipelineNode.PTransformNode> pTransformNodes) {
286291
this.pipelineOptions = pipelineOptions;
287292
this.timerInternalsFactory = timerInternalsFactory;
288293
this.windowingStrategy = windowingStrategy;
@@ -292,7 +297,8 @@ private SdkHarnessDoFnRunner(
292297
this.bundledEventsBag = bundledEventsBag;
293298
this.stateRequestHandler = stateRequestHandler;
294299
this.samzaExecutionContext = samzaExecutionContext;
295-
this.metricName = "ExecutableStage-" + stepName + "-process-ns";
300+
this.stepName = stepName;
301+
this.pTransformNodes = pTransformNodes;
296302
}
297303

298304
@SuppressWarnings("unchecked")
@@ -324,12 +330,25 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
324330
final TimerReceiverFactory timerReceiverFactory =
325331
new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
326332

333+
Map<String, String> transformFullNameToUniqueName =
334+
pTransformNodes.stream()
335+
.collect(
336+
Collectors.toMap(
337+
pTransformNode -> pTransformNode.getId(),
338+
pTransformNode -> pTransformNode.getTransform().getUniqueName()));
339+
340+
SamzaMetricsBundleProgressHandler samzaMetricsBundleProgressHandler =
341+
new SamzaMetricsBundleProgressHandler(
342+
stepName,
343+
samzaExecutionContext.getMetricsContainer(),
344+
transformFullNameToUniqueName);
345+
327346
remoteBundle =
328347
stageBundleFactory.getBundle(
329348
receiverFactory,
330349
timerReceiverFactory,
331350
stateRequestHandler,
332-
BundleProgressHandler.ignored());
351+
samzaMetricsBundleProgressHandler);
333352

334353
startBundleTime = getStartBundleTime();
335354

@@ -396,6 +415,7 @@ private void emitMetrics() {
396415
final long finishBundleTime = System.nanoTime();
397416
final long averageProcessTime = (finishBundleTime - startBundleTime) / count;
398417

418+
String metricName = "ExecutableStage-" + stepName + "-process-ns";
399419
samzaExecutionContext
400420
.getMetricsContainer()
401421
.updateExecutableStageBundleMetric(metricName, averageProcessTime);
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.samza.runtime;
19+
20+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
21+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
22+
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
23+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
24+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
25+
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
26+
27+
import java.util.Map;
28+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
29+
import org.apache.beam.model.pipeline.v1.MetricsApi;
30+
import org.apache.beam.runners.core.metrics.DistributionData;
31+
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
32+
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
33+
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
34+
import org.apache.beam.sdk.metrics.Counter;
35+
import org.apache.beam.sdk.metrics.Distribution;
36+
import org.apache.beam.sdk.metrics.Gauge;
37+
import org.apache.beam.sdk.metrics.MetricName;
38+
import org.apache.beam.sdk.metrics.MetricsContainer;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
/**
43+
* {@inheritDoc} Parses metrics information contained in the bundle progress messages. Passed the
44+
* updated metrics to the provided SamzaMetricsContainer.
45+
*/
46+
class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
47+
48+
private static final Logger LOG =
49+
LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
50+
private final String stepName;
51+
52+
private final SamzaMetricsContainer samzaMetricsContainer;
53+
private final Map<String, String> transformIdToUniqueName;
54+
55+
/**
56+
* Constructor of a SamzaMetricsBundleProgressHandler.
57+
*
58+
* <p>The full metric names in classic mode is {transformUniqueName}:{className}:{metricName}. We
59+
* attempt to follow the same format in portable mode, but the monitoringInfos returned by the
60+
* worker only contains the transformId. The current solution is to provide a mapping from
61+
* transformId back to uniqueName. A future improvement would be making the monitoring infos
62+
* contain the uniqueName.
63+
*
64+
* @param stepName Default stepName provided by the runner.
65+
* @param samzaMetricsContainer The destination for publishing the metrics.
66+
* @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms.
67+
*/
68+
public SamzaMetricsBundleProgressHandler(
69+
String stepName,
70+
SamzaMetricsContainer samzaMetricsContainer,
71+
Map<String, String> transformIdToUniqueName) {
72+
this.stepName = stepName;
73+
this.samzaMetricsContainer = samzaMetricsContainer;
74+
this.transformIdToUniqueName = transformIdToUniqueName;
75+
}
76+
77+
@Override
78+
/**
79+
* {@inheritDoc} Handles a progress report from the bundle while it is executing. We choose to
80+
* ignore the progress report. The metrics do not have to be updated on every progress report, so
81+
* we save computation resources by ignoring it.
82+
*/
83+
public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
84+
85+
@Override
86+
/**
87+
* {@inheritDoc} Handles the bundle's completion report. Parses the monitoringInfos in the
88+
* response, then updates the MetricsRegistry.
89+
*/
90+
public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
91+
response.getMonitoringInfosList().stream()
92+
.filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
93+
.forEach(this::parseAndUpdateMetric);
94+
}
95+
96+
/**
97+
* Parses the metric contained in monitoringInfo, then publishes the metric to the
98+
* metricContainer.
99+
*
100+
* <p>We attempt to construct a classic mode metricName
101+
* ({transformUniqueName}:{className}:{metricName}). All the info should be in the labels, but we
102+
* have fallbacks in case the labels don't exist.
103+
*
104+
* <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName using the
105+
* transformIdToUniqueName 2. The transformId provided by the monitoringInfo 3. The stepName
106+
* provided by the runner, which maybe a result of fusing.
107+
*
108+
* <p>Priorities for the className 1. The namespace label 2. The monitoringInfo urn. Copying the
109+
* implementation in MonitoringInfoMetricName.
110+
*
111+
* <p>Priorities for the metricName 1. The name label 2. The monitoringInfo urn. Copying the
112+
* implementation in MonitoringInfoMetricName.
113+
*
114+
* @see
115+
* org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
116+
*/
117+
private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
118+
String pTransformId =
119+
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
120+
String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
121+
String className =
122+
monitoringInfo.getLabelsOrDefault(
123+
MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
124+
String userMetricName =
125+
monitoringInfo.getLabelsOrDefault(
126+
MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString());
127+
128+
MetricsContainer metricsContainer = samzaMetricsContainer.getContainer(transformUniqueName);
129+
MetricName metricName = MetricName.named(className, userMetricName);
130+
131+
switch (monitoringInfo.getType()) {
132+
case SUM_INT64_TYPE:
133+
Counter counter = metricsContainer.getCounter(metricName);
134+
counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
135+
break;
136+
137+
case DISTRIBUTION_INT64_TYPE:
138+
Distribution distribution = metricsContainer.getDistribution(metricName);
139+
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
140+
distribution.update(data.sum(), data.count(), data.min(), data.max());
141+
break;
142+
143+
case LATEST_INT64_TYPE:
144+
Gauge gauge = metricsContainer.getGauge(metricName);
145+
// Gauge doesn't expose update as public. This will reset the timestamp.
146+
147+
gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
148+
break;
149+
150+
default:
151+
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
152+
}
153+
}
154+
}

0 commit comments

Comments
 (0)