Skip to content

Commit 983b83f

Browse files
committed
Merge pull request alteryx#61 from kayousterhout/daemon_thread
Unified daemon thread pools As requested by @mateiz in an earlier pull request, this refactors various daemon thread pools to use a set of methods in utils.scala, and also changes the thread-pool-creation methods in utils.scala to use named thread pools for improved debugging.
2 parents 3249e0e + 707ad8c commit 983b83f

File tree

7 files changed

+29
-38
lines changed

7 files changed

+29
-38
lines changed

core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
326326
private var blocksInRequestBitVector = new BitSet(totalBlocks)
327327

328328
override def run() {
329-
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
329+
var threadPool = Utils.newDaemonFixedThreadPool(
330+
MultiTracker.MaxChatSlots, "Bit Torrent Chatter")
330331

331332
while (hasBlocks.get < totalBlocks) {
332333
var numThreadsToCreate = 0
@@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
736737
private var setOfCompletedSources = Set[SourceInfo]()
737738

738739
override def run() {
739-
var threadPool = Utils.newDaemonCachedThreadPool()
740+
var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests")
740741
var serverSocket: ServerSocket = null
741742

742743
serverSocket = new ServerSocket(0)
@@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
927928
class ServeMultipleRequests
928929
extends Thread with Logging {
929930
// Server at most MultiTracker.MaxChatSlots peers
930-
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
931+
var threadPool = Utils.newDaemonFixedThreadPool(
932+
MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests")
931933

932934
override def run() {
933935
var serverSocket = new ServerSocket(0)

core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ extends Logging {
137137
class TrackMultipleValues
138138
extends Thread with Logging {
139139
override def run() {
140-
var threadPool = Utils.newDaemonCachedThreadPool()
140+
var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values")
141141
var serverSocket: ServerSocket = null
142142

143143
serverSocket = new ServerSocket(DriverTrackerPort)

core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable {
291291
private var setOfCompletedSources = Set[SourceInfo]()
292292

293293
override def run() {
294-
var threadPool = Utils.newDaemonCachedThreadPool()
294+
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests")
295295
var serverSocket: ServerSocket = null
296296

297297
serverSocket = new ServerSocket(0)
@@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable {
493493
class ServeMultipleRequests
494494
extends Thread with Logging {
495495

496-
var threadPool = Utils.newDaemonCachedThreadPool()
496+
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests")
497497

498498
override def run() {
499499
var serverSocket = new ServerSocket(0)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ private[spark] class Executor(
121121
}
122122

123123
// Start worker thread pool
124-
val threadPool = new ThreadPoolExecutor(
125-
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
124+
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
126125

127126
// Maintains the list of running tasks.
128127
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
7979
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
8080
private val registerRequests = new SynchronizedQueue[SendingConnection]
8181

82-
implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
82+
implicit val futureExecContext = ExecutionContext.fromExecutor(
83+
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
8384

8485
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
8586

core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,16 @@ import org.apache.spark._
2424
import org.apache.spark.TaskState.TaskState
2525
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
2626
import org.apache.spark.serializer.SerializerInstance
27+
import org.apache.spark.util.Utils
2728

2829
/**
2930
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
3031
*/
3132
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
3233
extends Logging {
33-
private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
34-
private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
35-
private val getTaskResultExecutor = new ThreadPoolExecutor(
36-
MIN_THREADS,
37-
MAX_THREADS,
38-
0L,
39-
TimeUnit.SECONDS,
40-
new LinkedBlockingDeque[Runnable],
41-
new ResultResolverThreadFactory)
42-
43-
class ResultResolverThreadFactory extends ThreadFactory {
44-
private var counter = 0
45-
private var PREFIX = "Result resolver thread"
46-
47-
override def newThread(r: Runnable): Thread = {
48-
val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
49-
counter += 1
50-
thread.setDaemon(true)
51-
return thread
52-
}
53-
}
34+
private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
35+
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
36+
THREADS, "Result resolver thread")
5437

5538
protected val serializer = new ThreadLocal[SerializerInstance] {
5639
override def initialValue(): SerializerInstance = {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -447,14 +447,17 @@ private[spark] object Utils extends Logging {
447447
hostPortParseResults.get(hostPort)
448448
}
449449

450-
private[spark] val daemonThreadFactory: ThreadFactory =
451-
new ThreadFactoryBuilder().setDaemon(true).build()
450+
private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
451+
new ThreadFactoryBuilder().setDaemon(true)
452452

453453
/**
454-
* Wrapper over newCachedThreadPool.
454+
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
455+
* unique, sequentially assigned integer.
455456
*/
456-
def newDaemonCachedThreadPool(): ThreadPoolExecutor =
457-
Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
457+
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
458+
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
459+
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
460+
}
458461

459462
/**
460463
* Return the string to tell how long has passed in seconds. The passing parameter should be in
@@ -465,10 +468,13 @@ private[spark] object Utils extends Logging {
465468
}
466469

467470
/**
468-
* Wrapper over newFixedThreadPool.
471+
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
472+
* unique, sequentially assigned integer.
469473
*/
470-
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
471-
Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
474+
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
475+
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
476+
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
477+
}
472478

473479
private def listFilesSafely(file: File): Seq[File] = {
474480
val files = file.listFiles()

0 commit comments

Comments
 (0)