Skip to content

Commit 7d62de4

Browse files
committed
Allow update mode for non-aggregation streaming queries
1 parent 15c2bd0 commit 7d62de4

File tree

7 files changed

+53
-44
lines changed

7 files changed

+53
-44
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
374374

375375
- *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
376376

377-
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
377+
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be same as the Append mode.
378378

379379
Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
380380

@@ -977,7 +977,7 @@ Here is the compatibility matrix.
977977
</tr>
978978
<tr>
979979
<td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
980-
<td style="vertical-align: middle;">Append</td>
980+
<td style="vertical-align: middle;">Append, Update</td>
981981
<td style="vertical-align: middle;">
982982
Complete mode not supported as it is infeasible to keep all data in the Result Table.
983983
</td>

sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public static OutputMode Complete() {
5757

5858
/**
5959
* OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
60-
* be written to the sink every time there are some updates.
60+
* be written to the sink every time there are some updates. If the query doesn't contain
61+
* aggregations, it will be same as the `Append` mode.
6162
*
6263
* @since 2.1.1
6364
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
7373
s"streaming DataFrames/DataSets")(plan)
7474
}
7575

76-
case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
76+
case InternalOutputModes.Complete if aggregates.isEmpty =>
7777
throwError(
7878
s"$outputMode output mode not supported when there are no streaming aggregations on " +
7979
s"streaming DataFrames/Datasets")(plan)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ private[sql] object InternalOutputModes {
4040

4141
/**
4242
* OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
43-
* written to the sink every time these is some updates. This output mode can only be used in
44-
* queries that contain aggregations.
43+
* written to the sink every time these is some updates. If the query doesn't contain
44+
* aggregations, it will be same as the `Append` mode.
4545
*/
4646
case object Update extends OutputMode
4747
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
219219
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
220220

221221
// Output modes with aggregation and non-aggregation plans
222-
testOutputMode(Append, shouldSupportAggregation = false)
223-
testOutputMode(Update, shouldSupportAggregation = true)
224-
testOutputMode(Complete, shouldSupportAggregation = true)
222+
testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
223+
testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
224+
testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
225225

226226
/*
227227
=======================================================================================
@@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
323323
/** Test output mode with and without aggregation in the streaming plan */
324324
def testOutputMode(
325325
outputMode: OutputMode,
326-
shouldSupportAggregation: Boolean): Unit = {
326+
shouldSupportAggregation: Boolean,
327+
shouldSupportNonAggregation: Boolean): Unit = {
327328

328329
// aggregation
329330
if (shouldSupportAggregation) {
330-
assertNotSupportedInStreamingPlan(
331-
s"$outputMode output mode - no aggregation",
332-
streamRelation.where($"a" > 1),
333-
outputMode = outputMode,
334-
Seq("aggregation", s"$outputMode output mode"))
335-
336331
assertSupportedInStreamingPlan(
337332
s"$outputMode output mode - aggregation",
338333
streamRelation.groupBy("a")("count(*)"),
339334
outputMode = outputMode)
340-
341335
} else {
336+
assertNotSupportedInStreamingPlan(
337+
s"$outputMode output mode - aggregation",
338+
streamRelation.groupBy("a")("count(*)"),
339+
outputMode = outputMode,
340+
Seq("aggregation", s"$outputMode output mode"))
341+
}
342+
343+
// non aggregation
344+
if (shouldSupportNonAggregation) {
342345
assertSupportedInStreamingPlan(
343346
s"$outputMode output mode - no aggregation",
344347
streamRelation.where($"a" > 1),
345348
outputMode = outputMode)
346-
349+
} else {
347350
assertNotSupportedInStreamingPlan(
348-
s"$outputMode output mode - aggregation",
349-
streamRelation.groupBy("a")("count(*)"),
351+
s"$outputMode output mode - no aggregation",
352+
streamRelation.where($"a" > 1),
350353
outputMode = outputMode,
351354
Seq("aggregation", s"$outputMode output mode"))
352355
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
4444
* written to the sink
4545
* - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
4646
* to the sink every time these is some updates
47+
* - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
48+
* will be written to the sink every time there are some updates. If
49+
* the query doesn't contain aggregations, it will be same as the
50+
* `OutputMode.Append()` mode.
4751
*
4852
* @since 2.0.0
4953
*/
@@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
5862
* the sink
5963
* - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
6064
* every time these is some updates
61-
*
65+
* - `update`: only the rows that were updated in the streaming DataFrame/Dataset will
66+
* be written to the sink every time there are some updates. If the query doesn't
67+
* contain aggregations, it will be same as the `append` mode.
6268
* @since 2.0.0
6369
*/
6470
def outputMode(outputMode: String): DataStreamWriter[T] = {
@@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
220226
if (extraOptions.get("queryName").isEmpty) {
221227
throw new AnalysisException("queryName must be specified for memory sink")
222228
}
223-
val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
224-
outputMode match {
225-
case Append | Complete => // allowed
226-
case Update =>
227-
throw new AnalysisException(
228-
s"Update output mode is not supported for memory sink. $supportedModes")
229-
case _ =>
230-
throw new AnalysisException(
231-
s"$outputMode is not supported for memory sink. $supportedModes")
232-
}
233229
val sink = new MemorySink(df.schema, outputMode)
234230
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
235231
val chkpointLoc = extraOptions.get("checkpointLocation")

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
137137
}
138138

139139

140-
test("registering as a table in Append output mode - supported") {
140+
test("registering as a table in Append output mode") {
141141
val input = MemoryStream[Int]
142142
val query = input.toDF().writeStream
143143
.format("memory")
@@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
160160
query.stop()
161161
}
162162

163-
test("registering as a table in Complete output mode - supported") {
163+
test("registering as a table in Complete output mode") {
164164
val input = MemoryStream[Int]
165165
val query = input.toDF()
166166
.groupBy("value")
@@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
186186
query.stop()
187187
}
188188

189-
test("registering as a table in Update output mode - not supported") {
189+
test("registering as a table in Update output mode") {
190190
val input = MemoryStream[Int]
191-
val df = input.toDF()
192-
.groupBy("value")
193-
.count()
194-
intercept[AnalysisException] {
195-
df.writeStream
196-
.format("memory")
197-
.outputMode("update")
198-
.queryName("memStream")
199-
.start()
200-
}
191+
val query = input.toDF().writeStream
192+
.format("memory")
193+
.outputMode("update")
194+
.queryName("memStream")
195+
.start()
196+
input.addData(1, 2, 3)
197+
query.processAllAvailable()
198+
199+
checkDataset(
200+
spark.table("memStream").as[Int],
201+
1, 2, 3)
202+
203+
input.addData(4, 5, 6)
204+
query.processAllAvailable()
205+
checkDataset(
206+
spark.table("memStream").as[Int],
207+
1, 2, 3, 4, 5, 6)
208+
209+
query.stop()
201210
}
202211

203212
test("MemoryPlan statistics") {

0 commit comments

Comments
 (0)