Skip to content

Commit 53a7491

Browse files
committed
Fixed case matching
1 parent af8b6cc commit 53a7491

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec(
120120
val filteredIter = watermarkPredicateForData match {
121121
case Some(predicate) if timeoutConf == EventTimeTimeout =>
122122
iter.filter(row => !predicate.eval(row))
123-
case None =>
123+
case _ =>
124124
iter
125125
}
126126

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
589589
)
590590
}
591591

592-
test("flatMapGroupsWithState - streaming with event time timeout") {
592+
test("flatMapGroupsWithState - streaming with event time timeout + watermark") {
593593
// Function to maintain the max event time
594594
// Returns the max event time in the state, or -1 if the state was removed by timeout
595595
val stateFunc = (
@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
761761
assert(e.getMessage === "The output mode of function should be append or update")
762762
}
763763

764+
def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = {
765+
test("SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) {
766+
// Function to maintain running count up to 2, and then remove the count
767+
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
768+
val stateFunc =
769+
(key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => {
770+
if (state.hasTimedOut) {
771+
state.remove()
772+
Iterator((key, "-1"))
773+
} else {
774+
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
775+
state.update(RunningCount(count))
776+
state.setTimeoutDuration("10 seconds")
777+
Iterator((key, count.toString))
778+
}
779+
}
780+
781+
val clock = new StreamManualClock
782+
val inputData = MemoryStream[(String, Long)]
783+
val result =
784+
inputData.toDF().toDF("key", "time")
785+
.selectExpr("key", "cast(time as timestamp) as timestamp")
786+
.withWatermark("timestamp", "10 second")
787+
.as[(String, Long)]
788+
.groupByKey(x => x._1)
789+
.flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
790+
791+
testStream(result, Update)(
792+
StartStream(ProcessingTime("1 second"), triggerClock = clock),
793+
AddData(inputData, ("a", 1L)),
794+
AdvanceManualClock(1 * 1000),
795+
CheckLastBatch(("a", "1"))
796+
)
797+
}
798+
}
799+
testWithTimeout(NoTimeout)
800+
testWithTimeout(ProcessingTimeTimeout)
801+
764802
def testStateUpdateWithData(
765803
testName: String,
766804
stateUpdates: GroupState[Int] => Unit,

0 commit comments

Comments
 (0)