Skip to content

Commit d450053

Browse files
committed
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
1 parent cd273a2 commit d450053

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark
1919

2020
import java.io._
21+
import java.util.concurrent.ConcurrentHashMap
2122
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2223

2324
import scala.collection.mutable.{HashSet, HashMap, Map}
2425
import scala.concurrent.Await
26+
import scala.collection.JavaConversions._
2527

2628
import akka.actor._
2729
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8486
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
8587
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
8688
* master's corresponding HashMap.
89+
*
90+
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
91+
* thread-safe map.
8792
*/
8893
protected val mapStatuses: Map[Int, Array[MapStatus]]
8994

@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
339344
* MapOutputTrackerMaster.
340345
*/
341346
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
342-
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
347+
protected val mapStatuses: Map[Int, Array[MapStatus]] =
348+
new ConcurrentHashMap[Int, Array[MapStatus]]
343349
}
344350

345351
private[spark] object MapOutputTracker {

0 commit comments

Comments
 (0)