|
18 | 18 | package org.apache.spark |
19 | 19 |
|
20 | 20 | import java.io._ |
| 21 | +import java.util.concurrent.ConcurrentHashMap |
21 | 22 | import java.util.zip.{GZIPInputStream, GZIPOutputStream} |
22 | 23 |
|
23 | 24 | import scala.collection.mutable.{HashSet, HashMap, Map} |
24 | 25 | import scala.concurrent.Await |
| 26 | +import scala.collection.JavaConversions._ |
25 | 27 |
|
26 | 28 | import akka.actor._ |
27 | 29 | import akka.pattern.ask |
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging |
84 | 86 | * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks. |
85 | 87 | * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the |
86 | 88 | * master's corresponding HashMap. |
| 89 | + * |
| 90 | + * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a |
| 91 | + * thread-safe map. |
87 | 92 | */ |
88 | 93 | protected val mapStatuses: Map[Int, Array[MapStatus]] |
89 | 94 |
|
@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) |
339 | 344 | * MapOutputTrackerMaster. |
340 | 345 | */ |
341 | 346 | private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { |
342 | | - protected val mapStatuses = new HashMap[Int, Array[MapStatus]] |
| 347 | + protected val mapStatuses: Map[Int, Array[MapStatus]] = |
| 348 | + new ConcurrentHashMap[Int, Array[MapStatus]] |
343 | 349 | } |
344 | 350 |
|
345 | 351 | private[spark] object MapOutputTracker { |
|
0 commit comments