Skip to content

Commit 784db1b

Browse files
committed
Fix NoSuchElementException when a state is not set but timeoutThreshold is defined
1 parent 15205da commit 784db1b

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord {
5757
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
5858
if (wrappedState.isRemoved) {
5959
newStateMap.remove(key)
60-
} else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
60+
} else if (wrappedState.isUpdated
61+
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
6162
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
6263
}
6364
mappedData ++= returned

streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
190190
timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
191191
expectedStates = Nil, expectedTimingOutStates = Nil, expectedRemovedStates = Seq(123))
192192

193+
// If a state is not set but timeoutThreshold is defined, we should ignore this state.
194+
// Previously it threw NoSuchElementException (SPARK-13195).
195+
assertRecordUpdate(initStates = Seq(), data = Seq("noop"),
196+
timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
197+
expectedStates = Nil, expectedTimingOutStates = Nil)
193198
}
194199

195200
test("states generated by MapWithStateRDD") {

0 commit comments

Comments
 (0)