@@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
3737class ExecutorLauncher (args : ApplicationMasterArguments , conf : Configuration , sparkConf : SparkConf )
3838 extends Logging {
3939
40- def this (args : ApplicationMasterArguments , sparkConf : SparkConf ) = this (args, new Configuration (), sparkConf)
40+ def this (args : ApplicationMasterArguments , sparkConf : SparkConf ) =
41+ this (args, new Configuration (), sparkConf)
4142
4243 def this (args : ApplicationMasterArguments ) = this (args, new SparkConf ())
4344
@@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
6364 override def preStart () {
6465 logInfo(" Listen to driver: " + driverUrl)
6566 driver = context.actorSelection(driverUrl)
66- // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
67+ // Send a hello message thus the connection is actually established, thus we can
68+ // monitor Lifecycle Events.
6769 driver ! " Hello"
6870 context.system.eventStream.subscribe(self, classOf [RemotingLifecycleEvent ])
6971 }
@@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
104106 // Allocate all containers
105107 allocateExecutors()
106108
107- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
108- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
109+ // Launch a progress reporter thread, else app will get killed after expiration
110+ // (def: 10mins) timeout ensure that progress is sent before
111+ // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
109112
110113 val timeoutInterval = yarnConf.getInt(YarnConfiguration .RM_AM_EXPIRY_INTERVAL_MS , 120000 )
111114 // we want to be reasonably responsive without causing too many requests to RM.
@@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
163166 val appMasterRequest = Records .newRecord(classOf [RegisterApplicationMasterRequest ])
164167 .asInstanceOf [RegisterApplicationMasterRequest ]
165168 appMasterRequest.setApplicationAttemptId(appAttemptId)
166- // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
167- // Users can then monitor stderr/stdout on that node if required.
169+ // Setting this to master host,port - so that the ApplicationReport at client has
170+ // some sensible info. Users can then monitor stderr/stdout on that node if required.
168171 appMasterRequest.setHost(Utils .localHostName())
169172 appMasterRequest.setRpcPort(0 )
170173 // What do we provide here ? Might make sense to expose something sensible later ?
@@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
213216 // TODO: This is a bit ugly. Can we make it nicer?
214217 // TODO: Handle container failure
215218 while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (! driverClosed)) {
216- yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0 ))
219+ yarnAllocator.allocateContainers(
220+ math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0 ))
217221 Thread .sleep(100 )
218222 }
219223
@@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
230234 while (! driverClosed) {
231235 val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
232236 if (missingExecutorCount > 0 ) {
233- logInfo(" Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers" )
237+ logInfo(" Allocating " + missingExecutorCount +
238+ " containers to make up for (potentially ?) lost containers" )
234239 yarnAllocator.allocateContainers(missingExecutorCount)
235240 }
236241 else sendProgress()
0 commit comments