Skip to content

Commit ce51466

Browse files
[MINOR][REFACTORING] KeyValueGroupedDataset.mapGroupsWithState uses flatMapGroupsWithState
1 parent cb8d5cc commit ce51466

File tree

1 file changed

+2
-20
lines changed

1 file changed

+2
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
242242
def mapGroupsWithState[S: Encoder, U: Encoder](
243243
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
244244
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
245-
Dataset[U](
246-
sparkSession,
247-
FlatMapGroupsWithState[K, V, S, U](
248-
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
249-
groupingAttributes,
250-
dataAttributes,
251-
OutputMode.Update,
252-
isMapGroupsWithState = true,
253-
GroupStateTimeout.NoTimeout,
254-
child = logicalPlan))
245+
flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(flatMapFunc)
255246
}
256247

257248
/**
@@ -278,16 +269,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
278269
timeoutConf: GroupStateTimeout)(
279270
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
280271
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
281-
Dataset[U](
282-
sparkSession,
283-
FlatMapGroupsWithState[K, V, S, U](
284-
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
285-
groupingAttributes,
286-
dataAttributes,
287-
OutputMode.Update,
288-
isMapGroupsWithState = true,
289-
timeoutConf,
290-
child = logicalPlan))
272+
flatMapGroupsWithState(OutputMode.Update, timeoutConf)(flatMapFunc)
291273
}
292274

293275
/**

0 commit comments

Comments
 (0)