Skip to content

Commit 7c7d6d4

Browse files

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.hadoop.yarn.server.api.*;
4444
import org.iq80.leveldb.DB;
4545
import org.iq80.leveldb.DBIterator;
46-
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -59,13 +58,13 @@
5958

6059
/**
6160
* An external shuffle service used by Spark on Yarn.
62-
*
61+
* <p>
6362
* This is intended to be a long-running auxiliary service that runs in the NodeManager process.
6463
* A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
6564
* The application also automatically derives the service port through `spark.shuffle.service.port`
6665
* specified in the Yarn configuration. This is so that both the clients and the server agree on
6766
* the same port to communicate on.
68-
*
67+
* <p>
6968
* The service also optionally supports authentication. This ensures that executors from one
7069
* application cannot read the shuffle files written by those from another. This feature can be
7170
* enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
@@ -100,7 +99,7 @@ public class YarnShuffleService extends AuxiliaryService {
10099
private static final ObjectMapper mapper = new ObjectMapper();
101100
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
102101
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
103-
.StoreVersion(1, 0);
102+
.StoreVersion(1, 0);
104103

105104
// just for integration tests that want to look at this file -- in general not sensible as
106105
// a static
@@ -177,14 +176,14 @@ protected void serviceInit(Configuration conf) throws Exception {
177176
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
178177

179178
Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
180-
String.class, String.class, MetricsSource.class);
179+
String.class, String.class, MetricsSource.class);
181180
registerSourceMethod.setAccessible(true);
182181
registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " +
183-
"Shuffle Service", serviceMetrics);
182+
"Shuffle Service", serviceMetrics);
184183
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
185184
} catch (Exception e) {
186185
logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " +
187-
"proceeding without metrics", e);
186+
"proceeding without metrics", e);
188187
}
189188

190189
// If authentication is enabled, set up the shuffle server to use a
@@ -205,7 +204,7 @@ protected void serviceInit(Configuration conf) throws Exception {
205204
boundPort = port;
206205
String authEnabledString = authEnabled ? "enabled" : "not enabled";
207206
logger.info("Started YARN shuffle service for Spark on port {}. " +
208-
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
207+
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
209208
registeredExecutorFile);
210209
} catch (Exception e) {
211210
if (stopOnFailure) {
@@ -222,7 +221,7 @@ private void createSecretManager() throws IOException {
222221

223222
// Make sure this is protected in case its not in the NM recovery dir
224223
FileSystem fs = FileSystem.getLocal(_conf);
225-
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
224+
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
226225

227226
db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
228227
logger.info("Recovery location is: " + secretsFile.getPath());
@@ -363,10 +362,10 @@ protected Path getRecoveryPath(String fileName) {
363362
*/
364363
protected File initRecoveryDb(String dbFileName) {
365364
if (_recoveryPath != null) {
366-
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
367-
if (recoveryFile.exists()) {
368-
return recoveryFile;
369-
}
365+
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
366+
if (recoveryFile.exists()) {
367+
return recoveryFile;
368+
}
370369
}
371370
// db doesn't exist in recovery path go check local dirs for it
372371
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
@@ -433,8 +432,8 @@ public int hashCode() {
433432
@Override
434433
public String toString() {
435434
return Objects.toStringHelper(this)
436-
.add("appId", appId)
437-
.toString();
435+
.add("appId", appId)
436+
.toString();
438437
}
439438
}
440439

resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,16 @@
1616
*/
1717
package org.apache.spark.network.yarn
1818

19+
import scala.collection.JavaConverters._
20+
1921
import org.apache.hadoop.metrics2.MetricsRecordBuilder
2022
import org.mockito.Matchers._
2123
import org.mockito.Mockito.{mock, times, verify, when}
2224
import org.scalatest.Matchers
23-
import scala.collection.JavaConverters._
2425

26+
import org.apache.spark.SparkFunSuite
2527
import org.apache.spark.network.server.OneForOneStreamManager
2628
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
27-
import org.apache.spark.SparkFunSuite
2829

2930
class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
3031

0 commit comments

Comments
 (0)