Skip to content

Commit cf74f36

Browse files
author
pgandhi
committed
[SPARK-18364] : Expose metrics for YarnShuffleService
Added shuffle server metrics for Spark Yarn shuffle service. I have made my changes on top of Andrew Ash's PR and have additionally added two more metrics on top of them: numRegisteredConnections which indicate the number of registered connections to the shuffle service and numActiveConnections which indicate the number of active connections to the shuffle service at any given point in time. If these metrics are outputted to a file, we get something like this: 1533674653489 default.shuffleService: Hostname=openqe26blue-n9.blue.ygrid.yahoo.com, openBlockRequestLatencyMillis_count=729, openBlockRequestLatencyMillis_rate15=0.7110833548897356, openBlockRequestLatencyMillis_rate5=1.657808981793011, openBlockRequestLatencyMillis_rate1=2.2404486061620474, openBlockRequestLatencyMillis_rateMean=0.9242558551196706, numRegisteredConnections=35, blockTransferRateBytes_count=2635880512, blockTransferRateBytes_rate15=2578547.6094160094, blockTransferRateBytes_rate5=6048721.726302424, blockTransferRateBytes_rate1=8548922.518223226, blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, registerExecutorRequestLatencyMillis_count=5, registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, registerExecutorRequestLatencyMillis_rate1=2.8270296777387467E-6, registerExecutorRequestLatencyMillis_rateMean=0.006339206380043053, numActiveConnections=35
1 parent 12b1e91 commit cf74f36

File tree

8 files changed

+277
-4
lines changed

8 files changed

+277
-4
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222

23+
import com.codahale.metrics.Counter;
2324
import io.netty.channel.Channel;
2425
import io.netty.channel.socket.SocketChannel;
2526
import io.netty.handler.timeout.IdleStateHandler;
@@ -61,6 +62,8 @@ public class TransportContext {
6162
private final TransportConf conf;
6263
private final RpcHandler rpcHandler;
6364
private final boolean closeIdleConnections;
65+
// Number of registered connections to the shuffle service
66+
private Counter registeredConnections = new Counter();
6467

6568
/**
6669
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
@@ -170,8 +173,12 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
170173
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
171174
rpcHandler, conf.maxChunksBeingTransferred());
172175
return new TransportChannelHandler(client, responseHandler, requestHandler,
173-
conf.connectionTimeoutMs(), closeIdleConnections);
176+
conf.connectionTimeoutMs(), closeIdleConnections, this);
174177
}
175178

176179
public TransportConf getConf() { return conf; }
180+
181+
public Counter getRegisteredConnections() {
182+
return registeredConnections;
183+
}
177184
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.ChannelInboundHandlerAdapter;
2222
import io.netty.handler.timeout.IdleState;
2323
import io.netty.handler.timeout.IdleStateEvent;
24+
import org.apache.spark.network.TransportContext;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -55,18 +56,21 @@ public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
5556
private final TransportRequestHandler requestHandler;
5657
private final long requestTimeoutNs;
5758
private final boolean closeIdleConnections;
59+
private final TransportContext transportContext;
5860

5961
public TransportChannelHandler(
6062
TransportClient client,
6163
TransportResponseHandler responseHandler,
6264
TransportRequestHandler requestHandler,
6365
long requestTimeoutMs,
64-
boolean closeIdleConnections) {
66+
boolean closeIdleConnections,
67+
TransportContext transportContext) {
6568
this.client = client;
6669
this.responseHandler = responseHandler;
6770
this.requestHandler = requestHandler;
6871
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
6972
this.closeIdleConnections = closeIdleConnections;
73+
this.transportContext = transportContext;
7074
}
7175

7276
public TransportClient getClient() {
@@ -161,4 +165,14 @@ public TransportResponseHandler getResponseHandler() {
161165
return responseHandler;
162166
}
163167

168+
@Override
169+
public void channelRegistered(ChannelHandlerContext ctx) {
170+
transportContext.getRegisteredConnections().inc();
171+
}
172+
173+
@Override
174+
public void channelUnregistered(ChannelHandlerContext ctx) {
175+
transportContext.getRegisteredConnections().dec();
176+
}
177+
164178
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import com.codahale.metrics.Counter;
2526
import com.codahale.metrics.MetricSet;
2627
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.Lists;
@@ -159,4 +160,8 @@ public void close() {
159160
}
160161
bootstrap = null;
161162
}
163+
164+
public Counter getRegisteredConnections() {
165+
return context.getRegisteredConnections();
166+
}
162167
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.codahale.metrics.Metric;
3030
import com.codahale.metrics.MetricSet;
3131
import com.codahale.metrics.Timer;
32+
import com.codahale.metrics.Counter;
3233
import com.google.common.annotations.VisibleForTesting;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
@@ -173,22 +174,29 @@ private void checkAuth(TransportClient client, String appId) {
173174
/**
174175
* A simple class to wrap all shuffle service wrapper metrics
175176
*/
176-
private class ShuffleMetrics implements MetricSet {
177+
@VisibleForTesting
178+
public class ShuffleMetrics implements MetricSet {
177179
private final Map<String, Metric> allMetrics;
178180
// Time latency for open block request in ms
179181
private final Timer openBlockRequestLatencyMillis = new Timer();
180182
// Time latency for executor registration latency in ms
181183
private final Timer registerExecutorRequestLatencyMillis = new Timer();
182184
// Block transfer rate in byte per second
183185
private final Meter blockTransferRateBytes = new Meter();
186+
// Number of active connections to the shuffle service
187+
private Counter activeConnections = new Counter();
188+
// Number of registered connections to the shuffle service
189+
private Counter registeredConnections = new Counter();
184190

185-
private ShuffleMetrics() {
191+
public ShuffleMetrics() {
186192
allMetrics = new HashMap<>();
187193
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
188194
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
189195
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
190196
allMetrics.put("registeredExecutorsSize",
191197
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
198+
allMetrics.put("numActiveConnections", activeConnections);
199+
allMetrics.put("numRegisteredConnections", registeredConnections);
192200
}
193201

194202
@Override
@@ -244,4 +252,14 @@ public ManagedBuffer next() {
244252
}
245253
}
246254

255+
@Override
256+
public void channelActive(TransportClient client) {
257+
metrics.activeConnections.inc();
258+
}
259+
260+
@Override
261+
public void channelInactive(TransportClient client) {
262+
metrics.activeConnections.dec();
263+
}
264+
247265
}

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22+
import java.lang.reflect.Method;
2223
import java.nio.charset.StandardCharsets;
2324
import java.nio.ByteBuffer;
2425
import java.util.List;
@@ -35,6 +36,9 @@
3536
import org.apache.hadoop.fs.FileSystem;
3637
import org.apache.hadoop.fs.Path;
3738
import org.apache.hadoop.fs.permission.FsPermission;
39+
import org.apache.hadoop.metrics2.MetricsSource;
40+
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
41+
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
3842
import org.apache.hadoop.yarn.api.records.ContainerId;
3943
import org.apache.hadoop.yarn.server.api.*;
4044
import org.apache.spark.network.util.LevelDBProvider;
@@ -191,6 +195,22 @@ protected void serviceInit(Configuration conf) throws Exception {
191195
logger.info("Started YARN shuffle service for Spark on port {}. " +
192196
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
193197
registeredExecutorFile);
198+
try {
199+
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
200+
shuffleServer.getRegisteredConnections());
201+
YarnShuffleServiceMetrics serviceMetrics =
202+
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
203+
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
204+
Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
205+
String.class, String.class, MetricsSource.class);
206+
registerSourceMethod.setAccessible(true);
207+
registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " +
208+
"Shuffle Service", serviceMetrics);
209+
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
210+
} catch (Exception e) {
211+
logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " +
212+
"proceeding without metrics", e);
213+
}
194214
} catch (Exception e) {
195215
if (stopOnFailure) {
196216
throw e;
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.yarn;
19+
20+
import com.codahale.metrics.*;
21+
import com.google.common.annotations.VisibleForTesting;
22+
import org.apache.hadoop.metrics2.MetricsCollector;
23+
import org.apache.hadoop.metrics2.MetricsInfo;
24+
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
25+
import org.apache.hadoop.metrics2.MetricsSource;
26+
27+
import java.util.Map;
28+
29+
/**
30+
* Modeled off of YARN's NodeManagerMetrics.
31+
*/
32+
public class YarnShuffleServiceMetrics implements MetricsSource {
33+
34+
private final MetricSet metricSet;
35+
36+
public YarnShuffleServiceMetrics(MetricSet metricSet) {
37+
this.metricSet = metricSet;
38+
}
39+
40+
/**
41+
* Get metrics from the source
42+
*
43+
* @param collector to contain the resulting metrics snapshot
44+
* @param all if true, return all metrics even if unchanged.
45+
*/
46+
@Override
47+
public void getMetrics(MetricsCollector collector, boolean all) {
48+
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService");
49+
50+
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
51+
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
52+
}
53+
}
54+
55+
@VisibleForTesting
56+
public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name,
57+
Metric metric) {
58+
59+
// The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics
60+
if (metric instanceof Timer) {
61+
Timer t = (Timer) metric;
62+
metricsRecordBuilder
63+
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
64+
t.getCount())
65+
.addGauge(
66+
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
67+
t.getFifteenMinuteRate())
68+
.addGauge(
69+
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
70+
t.getFiveMinuteRate())
71+
.addGauge(
72+
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
73+
t.getOneMinuteRate())
74+
.addGauge(
75+
new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
76+
t.getMeanRate());
77+
} else if (metric instanceof Meter) {
78+
Meter m = (Meter) metric;
79+
metricsRecordBuilder
80+
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
81+
m.getCount())
82+
.addGauge(
83+
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
84+
m.getFifteenMinuteRate())
85+
.addGauge(
86+
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
87+
m.getFiveMinuteRate())
88+
.addGauge(
89+
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
90+
m.getOneMinuteRate())
91+
.addGauge(
92+
new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
93+
m.getMeanRate());
94+
} else if (metric instanceof Gauge) {
95+
Gauge m = (Gauge) metric;
96+
Object gaugeValue = m.getValue();
97+
if (gaugeValue instanceof Integer) {
98+
Integer intValue = (Integer) gaugeValue;
99+
metricsRecordBuilder
100+
.addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " +
101+
"gauge " + name), intValue.intValue());
102+
}
103+
} else if (metric instanceof Counter) {
104+
Counter c = (Counter) metric;
105+
long counterValue = c.getCount();
106+
metricsRecordBuilder
107+
.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " +
108+
"connections to shuffle service " + name), counterValue);
109+
}
110+
}
111+
112+
private static class ShuffleServiceMetricsInfo implements MetricsInfo {
113+
114+
private final String name;
115+
private final String description;
116+
117+
ShuffleServiceMetricsInfo(String name, String description) {
118+
this.name = name;
119+
this.description = description;
120+
}
121+
122+
@Override
123+
public String name() {
124+
return name;
125+
}
126+
127+
@Override
128+
public String description() {
129+
return description;
130+
}
131+
}
132+
}

core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
8484
server = transportContext.createServer(port, bootstraps.asJava)
8585

8686
shuffleServiceSource.registerMetricSet(server.getAllMetrics)
87+
blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections",
88+
server.getRegisteredConnections)
8789
shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
8890
masterMetricsSystem.registerSource(shuffleServiceSource)
8991
masterMetricsSystem.start()

0 commit comments

Comments
 (0)