4343import org .apache .hadoop .yarn .server .api .*;
4444import org .iq80 .leveldb .DB ;
4545import org .iq80 .leveldb .DBIterator ;
46-
4746import org .slf4j .Logger ;
4847import org .slf4j .LoggerFactory ;
4948
5958
6059/**
6160 * An external shuffle service used by Spark on Yarn.
62- *
61+ * <p>
6362 * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
6463 * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
6564 * The application also automatically derives the service port through `spark.shuffle.service.port`
6665 * specified in the Yarn configuration. This is so that both the clients and the server agree on
6766 * the same port to communicate on.
68- *
67+ * <p>
6968 * The service also optionally supports authentication. This ensures that executors from one
7069 * application cannot read the shuffle files written by those from another. This feature can be
7170 * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
@@ -100,7 +99,7 @@ public class YarnShuffleService extends AuxiliaryService {
10099 private static final ObjectMapper mapper = new ObjectMapper ();
101100 private static final String APP_CREDS_KEY_PREFIX = "AppCreds" ;
102101 private static final LevelDBProvider .StoreVersion CURRENT_VERSION = new LevelDBProvider
103- .StoreVersion (1 , 0 );
102+ .StoreVersion (1 , 0 );
104103
105104 // just for integration tests that want to look at this file -- in general not sensible as
106105 // a static
@@ -177,14 +176,14 @@ protected void serviceInit(Configuration conf) throws Exception {
177176 MetricsSystemImpl metricsSystem = (MetricsSystemImpl ) DefaultMetricsSystem .instance ();
178177
179178 Method registerSourceMethod = metricsSystem .getClass ().getDeclaredMethod ("registerSource" ,
180- String .class , String .class , MetricsSource .class );
179+ String .class , String .class , MetricsSource .class );
181180 registerSourceMethod .setAccessible (true );
182181 registerSourceMethod .invoke (metricsSystem , "shuffleService" , "Metrics on the Spark " +
183- "Shuffle Service" , serviceMetrics );
182+ "Shuffle Service" , serviceMetrics );
184183 logger .info ("Registered metrics with Hadoop's DefaultMetricsSystem" );
185184 } catch (Exception e ) {
186185 logger .warn ("Unable to register Spark Shuffle Service metrics with Node Manager; " +
187- "proceeding without metrics" , e );
186+ "proceeding without metrics" , e );
188187 }
189188
190189 // If authentication is enabled, set up the shuffle server to use a
@@ -205,7 +204,7 @@ protected void serviceInit(Configuration conf) throws Exception {
205204 boundPort = port ;
206205 String authEnabledString = authEnabled ? "enabled" : "not enabled" ;
207206 logger .info ("Started YARN shuffle service for Spark on port {}. " +
208- "Authentication is {}. Registered executor file is {}" , port , authEnabledString ,
207+ "Authentication is {}. Registered executor file is {}" , port , authEnabledString ,
209208 registeredExecutorFile );
210209 } catch (Exception e ) {
211210 if (stopOnFailure ) {
@@ -222,7 +221,7 @@ private void createSecretManager() throws IOException {
222221
223222 // Make sure this is protected in case its not in the NM recovery dir
224223 FileSystem fs = FileSystem .getLocal (_conf );
225- fs .mkdirs (new Path (secretsFile .getPath ()), new FsPermission ((short )0700 ));
224+ fs .mkdirs (new Path (secretsFile .getPath ()), new FsPermission ((short ) 0700 ));
226225
227226 db = LevelDBProvider .initLevelDB (secretsFile , CURRENT_VERSION , mapper );
228227 logger .info ("Recovery location is: " + secretsFile .getPath ());
@@ -363,10 +362,10 @@ protected Path getRecoveryPath(String fileName) {
363362 */
364363 protected File initRecoveryDb (String dbFileName ) {
365364 if (_recoveryPath != null ) {
366- File recoveryFile = new File (_recoveryPath .toUri ().getPath (), dbFileName );
367- if (recoveryFile .exists ()) {
368- return recoveryFile ;
369- }
365+ File recoveryFile = new File (_recoveryPath .toUri ().getPath (), dbFileName );
366+ if (recoveryFile .exists ()) {
367+ return recoveryFile ;
368+ }
370369 }
371370 // db doesn't exist in recovery path go check local dirs for it
372371 String [] localDirs = _conf .getTrimmedStrings ("yarn.nodemanager.local-dirs" );
@@ -433,8 +432,8 @@ public int hashCode() {
433432 @ Override
434433 public String toString () {
435434 return Objects .toStringHelper (this )
436- .add ("appId" , appId )
437- .toString ();
435+ .add ("appId" , appId )
436+ .toString ();
438437 }
439438 }
440439
0 commit comments