You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Now consider what happens if one of the events arrives late to the application.
683
683
For example, a word that was generated at 12:04 but it was received at 12:11.
684
-
Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping – the late data is automatically placed in the proper windows and the correct aggregates are updated as illustrated below.
684
+
Since this windowing is based on the time in the data, the time 12:04 should be considered for
685
+
windowing. This occurs naturally in our window-based grouping – the late data is
686
+
automatically placed in the proper windows and the correct aggregates are updated as illustrated below.
685
687
686
688

687
689
688
-
Furthermore, since Spark 2.1, you can define a watermark on the event time,
689
-
and specify the threshold on how late the date can be in terms of the event
690
-
time. The engine will automatically track the event time and drop any state
691
-
that is related to old windows that are not expected to receive older
692
-
than (max event time seen - late threshold). This allows the engine to bound
693
-
the size of the state that is needed for calculating windowed aggregates.
694
-
For example, we can apply watermarking to the previous example as follows.
695
-
690
+
Furthermore, since Spark 2.1, you can define a watermark on the event time, and specify the threshold
691
+
on how late the date can be in terms of the event time. The engine will automatically track the
692
+
event time and drop any state that is related to old windows that are not expected to receive older
693
+
than (max event time seen - late threshold). This allows the engine to bound the size of the state
694
+
that is needed for calculating windowed aggregates. For example, we can apply watermarking to the
695
+
previous example as follows.
696
696
697
697
<divclass="codetabs">
698
698
<divdata-lang="scala"markdown="1">
@@ -704,7 +704,7 @@ val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: S
704
704
705
705
// Group the data by window and word and compute the count of each group
706
706
val windowedCounts = words
707
-
.withWatermark("timestamp", "20 minutes")
707
+
.withWatermark("timestamp", "10 minutes")
708
708
.groupBy(
709
709
window($"timestamp", "10 minutes", "5 minutes"),
710
710
$"word")
@@ -719,7 +719,7 @@ Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp
719
719
720
720
// Group the data by window and word and compute the count of each group
In this example, we are defining the watermark of the query on the value
747
-
of the column "timestamp", and also defining "20 minutes" as the threshold
748
-
of how late data is allowed to be. If this query is run in Append output
749
-
mode (discussed later in [Output Modes](#output-modes) section),
750
-
the engine will track the current event time from the column "timestamp"
751
-
and add a windowed aggregate to the Result Table only when the window
752
-
is more than 20 minutes older than the observed event time.
746
+
In this example, we are defining the watermark of the query on the value of the column "timestamp",
747
+
and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query
748
+
is run in Append output mode (discussed later in [Output Modes](#output-modes) section),
749
+
the engine will track the current event time from the column "timestamp" and wait for additional
750
+
"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
751
+
Here is an illustration.
752
+
753
+

753
754
754
-
For example, in the above query, if the engine observes that the maximum
755
-
"timestamp" is `12:26`, then it will compute the final count of all windows
756
-
older than `12:26 - 10m = 12:16` (say, `[12:05, 12:15)`) and append
757
-
them to the Result Table.
755
+
As shown in the illustration, the engine tracks the maximum event time seen in the data (blue line),
756
+
and accordingly sets the watermark (red line) for the next trigger as
757
+
`max event time - late threshold`. So, when the engine observes the data `(12:14, dog)`,
758
+
it sets the watermark for the next interval.
759
+
For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
760
+
is waiting for late data. After the system finds data (i.e. (12:21, owl)) such that the
761
+
watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
762
+
not change any further as all "too-late" data older than 12:10 will be ignored.
758
763
759
-
It is important to note that the following conditions must be satisfied
760
-
for the watermarking to clean the data in aggregation queries.
764
+
Note that in Append output mode, the system has to wait for "late threshold" time
765
+
before it can output the aggregation of a window. This may not be ideal if data can be very late,
766
+
(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
767
+
Update output mode which would allows updated aggregates to be posted.
761
768
762
-
- Output mode must be Append. Complete mode requires all aggregate
763
-
data to be preserved, and hence cannot use watermarking to drop
764
-
intermediate state.
769
+
It is important to note that the following conditions must be satisfied for the watermarking to
770
+
clean the data in aggregation queries (as of Spark 2.1, subject to change in the future).
765
771
766
-
- The aggregation must have either the event-time column, or a `window`
767
-
on the event-time column.
772
+
- Output mode must be Append. Complete mode requires all aggregate data to be preserved, and hence
773
+
cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section
774
+
for detailed explanation of the semantics of each output mode.
775
+
776
+
- The aggregation must have either the event-time column, or a `window` on the event-time column.
768
777
769
778
-`withWatermark` must be called on the
770
779
same column as the timestamp column used in the aggregate. For example,
771
780
`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid
772
781
in Append output mode, as watermark is defined on a different column
773
782
as the aggregation column.
774
783
775
-
-`withWatermark` must be called before the aggregation. For example,
776
-
`df.groupBy("time").count().withWatermark("time", "1 min")` is invalid
777
-
in Append output mode.
784
+
-`withWatermark` must be called before the aggregation for the watermark details to be used.
785
+
For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append
786
+
output mode.
778
787
779
788
780
789
### Join Operations
@@ -863,7 +872,7 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o
863
872
-*Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
864
873
865
874
#### Output Modes
866
-
There are two types of output mode currently implemented.
875
+
There are a few types of output modes.
867
876
868
877
-**Append mode (default)** - This is the default mode, where only the
869
878
new rows added to the Result Table since the last trigger will be
@@ -878,6 +887,9 @@ fault-tolerant sink). For example, queries with only `select`,
878
887
enough intermediate state that all the rows in Result Table is
879
888
returned every time.
880
889
890
+
-**Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table since the
891
+
last trigger will be outputted to the sink. More information to be added in future releases.
892
+
881
893
Different types of streaming queries support different output modes.
0 commit comments