Skip to content

Commit 1566433

Browse files
committed
Added new figure
1 parent ed8d9e0 commit 1566433

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed
240 KB
Loading

docs/img/structured-streaming.pptx

8.1 KB
Binary file not shown.

docs/structured-streaming-programming-guide.md

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -681,18 +681,18 @@ windowedCounts = words.groupBy(
681681
### Handling Late Data and Watermarking
682682
Now consider what happens if one of the events arrives late to the application.
683683
For example, a word that was generated at 12:04 but it was received at 12:11.
684-
Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping – the late data is automatically placed in the proper windows and the correct aggregates are updated as illustrated below.
684+
Since this windowing is based on the time in the data, the time 12:04 should be considered for
685+
windowing. This occurs naturally in our window-based grouping – the late data is
686+
automatically placed in the proper windows and the correct aggregates are updated as illustrated below.
685687

686688
![Handling Late Data](img/structured-streaming-late-data.png)
687689

688-
Furthermore, since Spark 2.1, you can define a watermark on the event time,
689-
and specify the threshold on how late the date can be in terms of the event
690-
time. The engine will automatically track the event time and drop any state
691-
that is related to old windows that are not expected to receive older
692-
than (max event time seen - late threshold). This allows the engine to bound
693-
the size of the state that is needed for calculating windowed aggregates.
694-
For example, we can apply watermarking to the previous example as follows.
695-
690+
Furthermore, since Spark 2.1, you can define a watermark on the event time, and specify the threshold
691+
on how late the date can be in terms of the event time. The engine will automatically track the
692+
event time and drop any state that is related to old windows that are not expected to receive older
693+
than (max event time seen - late threshold). This allows the engine to bound the size of the state
694+
that is needed for calculating windowed aggregates. For example, we can apply watermarking to the
695+
previous example as follows.
696696

697697
<div class="codetabs">
698698
<div data-lang="scala" markdown="1">
@@ -704,7 +704,7 @@ val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: S
704704

705705
// Group the data by window and word and compute the count of each group
706706
val windowedCounts = words
707-
.withWatermark("timestamp", "20 minutes")
707+
.withWatermark("timestamp", "10 minutes")
708708
.groupBy(
709709
window($"timestamp", "10 minutes", "5 minutes"),
710710
$"word")
@@ -719,7 +719,7 @@ Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp
719719

720720
// Group the data by window and word and compute the count of each group
721721
Dataset<Row> windowedCounts = words
722-
.withWatermark("timestamp", "20 minutes")
722+
.withWatermark("timestamp", "10 minutes")
723723
.groupBy(
724724
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
725725
words.col("word"))
@@ -733,7 +733,7 @@ words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: Strin
733733

734734
# Group the data by window and word and compute the count of each group
735735
windowedCounts = words
736-
.withWatermark("timestamp", "20 minutes")
736+
.withWatermark("timestamp", "10 minutes")
737737
.groupBy(
738738
window(words.timestamp, "10 minutes", "5 minutes"),
739739
words.word)
@@ -743,38 +743,47 @@ windowedCounts = words
743743
</div>
744744
</div>
745745

746-
In this example, we are defining the watermark of the query on the value
747-
of the column "timestamp", and also defining "20 minutes" as the threshold
748-
of how late data is allowed to be. If this query is run in Append output
749-
mode (discussed later in [Output Modes](#output-modes) section),
750-
the engine will track the current event time from the column "timestamp"
751-
and add a windowed aggregate to the Result Table only when the window
752-
is more than 20 minutes older than the observed event time.
746+
In this example, we are defining the watermark of the query on the value of the column "timestamp",
747+
and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query
748+
is run in Append output mode (discussed later in [Output Modes](#output-modes) section),
749+
the engine will track the current event time from the column "timestamp" and wait for additional
750+
"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
751+
Here is an illustration.
752+
753+
![Watermarking in Append Mode](img/structured-streaming-watermark.png)
753754

754-
For example, in the above query, if the engine observes that the maximum
755-
"timestamp" is `12:26`, then it will compute the final count of all windows
756-
older than `12:26 - 10m = 12:16` (say, `[12:05, 12:15)`) and append
757-
them to the Result Table.
755+
As shown in the illustration, the engine tracks the maximum event time seen in the data (blue line),
756+
and accordingly sets the watermark (red line) for the next trigger as
757+
`max event time - late threshold`. So, when the engine observes the data `(12:14, dog)`,
758+
it sets the watermark for the next interval.
759+
For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
760+
is waiting for late data. After the system finds data (i.e. (12:21, owl)) such that the
761+
watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
762+
not change any further as all "too-late" data older than 12:10 will be ignored.
758763

759-
It is important to note that the following conditions must be satisfied
760-
for the watermarking to clean the data in aggregation queries.
764+
Note that in Append output mode, the system has to wait for "late threshold" time
765+
before it can output the aggregation of a window. This may not be ideal if data can be very late,
766+
(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
767+
Update output mode which would allows updated aggregates to be posted.
761768

762-
- Output mode must be Append. Complete mode requires all aggregate
763-
data to be preserved, and hence cannot use watermarking to drop
764-
intermediate state.
769+
It is important to note that the following conditions must be satisfied for the watermarking to
770+
clean the data in aggregation queries (as of Spark 2.1, subject to change in the future).
765771

766-
- The aggregation must have either the event-time column, or a `window`
767-
on the event-time column.
772+
- Output mode must be Append. Complete mode requires all aggregate data to be preserved, and hence
773+
cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section
774+
for detailed explanation of the semantics of each output mode.
775+
776+
- The aggregation must have either the event-time column, or a `window` on the event-time column.
768777

769778
- `withWatermark` must be called on the
770779
same column as the timestamp column used in the aggregate. For example,
771780
`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid
772781
in Append output mode, as watermark is defined on a different column
773782
as the aggregation column.
774783

775-
- `withWatermark` must be called before the aggregation. For example,
776-
`df.groupBy("time").count().withWatermark("time", "1 min")` is invalid
777-
in Append output mode.
784+
- `withWatermark` must be called before the aggregation for the watermark details to be used.
785+
For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append
786+
output mode.
778787

779788

780789
### Join Operations
@@ -863,7 +872,7 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o
863872
- *Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
864873

865874
#### Output Modes
866-
There are two types of output mode currently implemented.
875+
There are a few types of output modes.
867876

868877
- **Append mode (default)** - This is the default mode, where only the
869878
new rows added to the Result Table since the last trigger will be
@@ -878,6 +887,9 @@ fault-tolerant sink). For example, queries with only `select`,
878887
enough intermediate state that all the rows in Result Table is
879888
returned every time.
880889

890+
- **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table since the
891+
last trigger will be outputted to the sink. More information to be added in future releases.
892+
881893
Different types of streaming queries support different output modes.
882894
Here is the compatibility matrix.
883895

0 commit comments

Comments
 (0)