Skip to content

Commit fefd22f

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-1113] External spilling - fix Int.MaxValue hash code collision bug
The original poster of this bug is @guojc, who opened a PR that preceded this one at https://github.com/apache/incubator-spark/pull/612. ExternalAppendOnlyMap uses key hash code to order the buffer streams from which spilled files are read back into memory. When a buffer stream is empty, the default hash code for that stream is equal to Int.MaxValue. This is, however, a perfectly legitimate candidate for a key hash code. When reading from a spilled map containing such a key, a hash collision may occur, in which case we attempt to read from an empty stream and throw NoSuchElementException. The fix is to maintain the invariant that empty buffer streams are never added back to the merge queue to be considered. This guarantees that we never read from an empty buffer stream, ever again. This PR also includes two new tests for hash collisions. Author: Andrew Or <[email protected]> Closes apache#624 from andrewor14/spilling-bug and squashes the following commits: 9e7263d [Andrew Or] Slightly optimize next() 2037ae2 [Andrew Or] Move a few comments around... cf95942 [Andrew Or] Remove default value of Int.MaxValue for minKeyHash c11f03b [Andrew Or] Fix Int.MaxValue hash collision bug in ExternalAppendOnlyMap 21c1a39 [Andrew Or] Add hash collision tests to ExternalAppendOnlyMapSuite
1 parent c8a4c9b commit fefd22f

File tree

2 files changed

+102
-38
lines changed

2 files changed

+102
-38
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
148148
}
149149

150150
/**
151-
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk
151+
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
152152
*/
153153
private def spill(mapSize: Long) {
154154
spillCount += 1
@@ -223,7 +223,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
223223
*/
224224
private class ExternalIterator extends Iterator[(K, C)] {
225225

226-
// A fixed-size queue that maintains a buffer for each stream we are currently merging
226+
// A queue that maintains a buffer for each stream we are currently merging
227+
// This queue maintains the invariant that it only contains non-empty buffers
227228
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
228229

229230
// Input streams are derived both from the in-memory map and spilled maps on disk
@@ -233,7 +234,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
233234

234235
inputStreams.foreach { it =>
235236
val kcPairs = getMorePairs(it)
236-
mergeHeap.enqueue(StreamBuffer(it, kcPairs))
237+
if (kcPairs.length > 0) {
238+
mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
239+
}
237240
}
238241

239242
/**
@@ -258,11 +261,11 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
258261

259262
/**
260263
* If the given buffer contains a value for the given key, merge that value into
261-
* baseCombiner and remove the corresponding (K, C) pair from the buffer
264+
* baseCombiner and remove the corresponding (K, C) pair from the buffer.
262265
*/
263266
private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
264267
var i = 0
265-
while (i < buffer.pairs.size) {
268+
while (i < buffer.pairs.length) {
266269
val (k, c) = buffer.pairs(i)
267270
if (k == key) {
268271
buffer.pairs.remove(i)
@@ -274,40 +277,41 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
274277
}
275278

276279
/**
277-
* Return true if there exists an input stream that still has unvisited pairs
280+
* Return true if there exists an input stream that still has unvisited pairs.
278281
*/
279-
override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
282+
override def hasNext: Boolean = mergeHeap.length > 0
280283

281284
/**
282285
* Select a key with the minimum hash, then combine all values with the same key from all
283286
* input streams.
284287
*/
285288
override def next(): (K, C) = {
289+
if (mergeHeap.length == 0) {
290+
throw new NoSuchElementException
291+
}
286292
// Select a key from the StreamBuffer that holds the lowest key hash
287293
val minBuffer = mergeHeap.dequeue()
288294
val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
289-
if (minPairs.length == 0) {
290-
// Should only happen when no other stream buffers have any pairs left
291-
throw new NoSuchElementException
292-
}
293295
var (minKey, minCombiner) = minPairs.remove(0)
294296
assert(minKey.hashCode() == minHash)
295297

296298
// For all other streams that may have this key (i.e. have the same minimum key hash),
297299
// merge in the corresponding value (if any) from that stream
298300
val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
299-
while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
301+
while (mergeHeap.length > 0 && mergeHeap.head.minKeyHash == minHash) {
300302
val newBuffer = mergeHeap.dequeue()
301303
minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
302304
mergedBuffers += newBuffer
303305
}
304306

305-
// Repopulate each visited stream buffer and add it back to the merge heap
307+
// Repopulate each visited stream buffer and add it back to the queue if it is non-empty
306308
mergedBuffers.foreach { buffer =>
307-
if (buffer.pairs.length == 0) {
309+
if (buffer.isEmpty) {
308310
buffer.pairs ++= getMorePairs(buffer.iterator)
309311
}
310-
mergeHeap.enqueue(buffer)
312+
if (!buffer.isEmpty) {
313+
mergeHeap.enqueue(buffer)
314+
}
311315
}
312316

313317
(minKey, minCombiner)
@@ -323,13 +327,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
323327
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
324328
extends Comparable[StreamBuffer] {
325329

326-
def minKeyHash: Int = {
327-
if (pairs.length > 0){
328-
// pairs are already sorted by key hash
329-
pairs(0)._1.hashCode()
330-
} else {
331-
Int.MaxValue
332-
}
330+
def isEmpty = pairs.length == 0
331+
332+
// Invalid if there are no more pairs in this stream
333+
def minKeyHash = {
334+
assert(pairs.length > 0)
335+
pairs.head._1.hashCode()
333336
}
334337

335338
override def compareTo(other: StreamBuffer): Int = {
@@ -356,7 +359,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
356359
private var objectsRead = 0
357360

358361
/**
359-
* Construct a stream that reads only from the next batch
362+
* Construct a stream that reads only from the next batch.
360363
*/
361364
private def nextBatchStream(): InputStream = {
362365
if (batchSizes.length > 0) {

core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,16 @@ package org.apache.spark.util.collection
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.scalatest.{BeforeAndAfter, FunSuite}
22+
import org.scalatest.FunSuite
2323

2424
import org.apache.spark._
2525
import org.apache.spark.SparkContext._
2626

27-
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
27+
class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
2828

29-
private val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
30-
private val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
31-
buffer += i
32-
}
33-
private val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
34-
(buf1, buf2) => {
35-
buf1 ++= buf2
36-
}
29+
private def createCombiner(i: Int) = ArrayBuffer[Int](i)
30+
private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
31+
private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
3732

3833
test("simple insert") {
3934
val conf = new SparkConf(false)
@@ -203,13 +198,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
203198
}
204199

205200
test("spilling") {
206-
// TODO: Use SparkConf (which currently throws connection reset exception)
207-
System.setProperty("spark.shuffle.memoryFraction", "0.001")
208-
sc = new SparkContext("local-cluster[1,1,512]", "test")
201+
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
202+
conf.set("spark.shuffle.memoryFraction", "0.001")
203+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
209204

210205
// reduceByKey - should spill ~8 times
211206
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
212-
val resultA = rddA.reduceByKey(math.max(_, _)).collect()
207+
val resultA = rddA.reduceByKey(math.max).collect()
213208
assert(resultA.length == 50000)
214209
resultA.foreach { case(k, v) =>
215210
k match {
@@ -252,7 +247,73 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
252247
case _ =>
253248
}
254249
}
250+
}
251+
252+
test("spilling with hash collisions") {
253+
val conf = new SparkConf(true)
254+
conf.set("spark.shuffle.memoryFraction", "0.001")
255+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
256+
257+
def createCombiner(i: String) = ArrayBuffer[String](i)
258+
def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
259+
def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
260+
buffer1 ++= buffer2
261+
262+
val map = new ExternalAppendOnlyMap[String, String, ArrayBuffer[String]](
263+
createCombiner, mergeValue, mergeCombiners)
264+
265+
val collisionPairs = Seq(
266+
("Aa", "BB"), // 2112
267+
("to", "v1"), // 3707
268+
("variants", "gelato"), // -1249574770
269+
("Teheran", "Siblings"), // 231609873
270+
("misused", "horsemints"), // 1069518484
271+
("isohel", "epistolaries"), // -1179291542
272+
("righto", "buzzards"), // -931102253
273+
("hierarch", "crinolines"), // -1732884796
274+
("inwork", "hypercatalexes"), // -1183663690
275+
("wainages", "presentencing"), // 240183619
276+
("trichothecenes", "locular"), // 339006536
277+
("pomatoes", "eructation") // 568647356
278+
)
279+
280+
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
281+
collisionPairs.foreach { case (w1, w2) =>
282+
map.insert(w1, w2)
283+
map.insert(w2, w1)
284+
}
285+
286+
// A map of collision pairs in both directions
287+
val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap
288+
289+
// Avoid map.size or map.iterator.length because this destructively sorts the underlying map
290+
var count = 0
291+
292+
val it = map.iterator
293+
while (it.hasNext) {
294+
val kv = it.next()
295+
val expectedValue = ArrayBuffer[String](collisionPairsMap.getOrElse(kv._1, kv._1))
296+
assert(kv._2.equals(expectedValue))
297+
count += 1
298+
}
299+
assert(count == 100000 + collisionPairs.size * 2)
300+
}
301+
302+
test("spilling with hash collisions using the Int.MaxValue key") {
303+
val conf = new SparkConf(true)
304+
conf.set("spark.shuffle.memoryFraction", "0.001")
305+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
255306

256-
System.clearProperty("spark.shuffle.memoryFraction")
307+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
308+
mergeValue, mergeCombiners)
309+
310+
(1 to 100000).foreach { i => map.insert(i, i) }
311+
map.insert(Int.MaxValue, Int.MaxValue)
312+
313+
val it = map.iterator
314+
while (it.hasNext) {
315+
// Should not throw NoSuchElementException
316+
it.next()
317+
}
257318
}
258319
}

0 commit comments

Comments
 (0)