@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
589589 )
590590 }
591591
592- test(" flatMapGroupsWithState - streaming with event time timeout" ) {
592+ test(" flatMapGroupsWithState - streaming with event time timeout + watermark " ) {
593593 // Function to maintain the max event time
594594 // Returns the max event time in the state, or -1 if the state was removed by timeout
595595 val stateFunc = (
@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
761761 assert(e.getMessage === " The output mode of function should be append or update" )
762762 }
763763
764+ def testWithTimeout (timeoutConf : GroupStateTimeout ): Unit = {
765+ test(" SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) {
766+ // Function to maintain running count up to 2, and then remove the count
767+ // Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
768+ val stateFunc =
769+ (key : String , values : Iterator [(String , Long )], state : GroupState [RunningCount ]) => {
770+ if (state.hasTimedOut) {
771+ state.remove()
772+ Iterator ((key, " -1" ))
773+ } else {
774+ val count = state.getOption.map(_.count).getOrElse(0L ) + values.size
775+ state.update(RunningCount (count))
776+ state.setTimeoutDuration(" 10 seconds" )
777+ Iterator ((key, count.toString))
778+ }
779+ }
780+
781+ val clock = new StreamManualClock
782+ val inputData = MemoryStream [(String , Long )]
783+ val result =
784+ inputData.toDF().toDF(" key" , " time" )
785+ .selectExpr(" key" , " cast(time as timestamp) as timestamp" )
786+ .withWatermark(" timestamp" , " 10 second" )
787+ .as[(String , Long )]
788+ .groupByKey(x => x._1)
789+ .flatMapGroupsWithState(Update , ProcessingTimeTimeout )(stateFunc)
790+
791+ testStream(result, Update )(
792+ StartStream (ProcessingTime (" 1 second" ), triggerClock = clock),
793+ AddData (inputData, (" a" , 1L )),
794+ AdvanceManualClock (1 * 1000 ),
795+ CheckLastBatch ((" a" , " 1" ))
796+ )
797+ }
798+ }
799+ testWithTimeout(NoTimeout )
800+ testWithTimeout(ProcessingTimeTimeout )
801+
764802 def testStateUpdateWithData (
765803 testName : String ,
766804 stateUpdates : GroupState [Int ] => Unit ,
0 commit comments