Skip to content

Commit 450dd85

Browse files
swegnerbchambers
authored andcommitted
Add display data to windowing transforms
Expose NeverTrigger as package-private since it is necessary for display data
1 parent c8ed398 commit 450dd85

32 files changed

Lines changed: 501 additions & 13 deletions

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
2222
import org.apache.beam.sdk.util.ExecutableTrigger;
2323

24+
import com.google.common.base.Joiner;
2425
import com.google.common.base.Preconditions;
2526

2627
import org.joda.time.Instant;
@@ -112,4 +113,13 @@ public void onOnlyFiring(TriggerContext context) throws Exception {
112113
subtrigger.invokeOnFire(context);
113114
}
114115
}
116+
117+
@Override
118+
public String toString() {
119+
StringBuilder builder = new StringBuilder("AfterAll.of(");
120+
Joiner.on(", ").appendTo(builder, subTriggers);
121+
builder.append(")");
122+
123+
return builder.toString();
124+
}
115125
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@
3636

3737
import org.joda.time.Duration;
3838
import org.joda.time.Instant;
39+
import org.joda.time.format.PeriodFormat;
40+
import org.joda.time.format.PeriodFormatter;
3941

4042
import java.util.List;
43+
import java.util.Locale;
4144
import java.util.Objects;
42-
4345
import javax.annotation.Nullable;
4446

4547
/**
@@ -59,6 +61,8 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
5961
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
6062
"delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
6163

64+
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
65+
6266
/**
6367
* To complete an implementation, return the desired time from the TriggerContext.
6468
*/
@@ -276,6 +280,11 @@ public boolean equals(Object object) {
276280
public int hashCode() {
277281
return Objects.hash(delay);
278282
}
283+
284+
@Override
285+
public String toString() {
286+
return PERIOD_FORMATTER.print(delay.toPeriod());
287+
}
279288
}
280289

281290
/**

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.beam.sdk.annotations.Experimental;
2323
import org.apache.beam.sdk.util.ExecutableTrigger;
2424

25+
import com.google.common.base.Joiner;
2526
import org.joda.time.Instant;
2627

2728
import java.util.Arrays;
@@ -127,6 +128,15 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
127128
updateFinishedState(context);
128129
}
129130

131+
@Override
132+
public String toString() {
133+
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
134+
Joiner.on(", ").appendTo(builder, subTriggers);
135+
builder.append(")");
136+
137+
return builder.toString();
138+
}
139+
130140
private void updateFinishedState(TriggerContext context) {
131141
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
132142
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
2222
import org.apache.beam.sdk.util.ExecutableTrigger;
2323

24+
import com.google.common.base.Joiner;
2425
import com.google.common.base.Preconditions;
2526

2627
import org.joda.time.Instant;
@@ -108,6 +109,15 @@ protected void onOnlyFiring(TriggerContext context) throws Exception {
108109
}
109110
}
110111

112+
@Override
113+
public String toString() {
114+
StringBuilder builder = new StringBuilder("AfterFirst.of(");
115+
Joiner.on(", ").appendTo(builder, subTriggers);
116+
builder.append(")");
117+
118+
return builder.toString();
119+
}
120+
111121
private void updateFinishedStatus(TriggerContext c) {
112122
boolean anyFinished = false;
113123
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.util.List;
2727
import java.util.Objects;
28-
2928
import javax.annotation.Nullable;
3029

3130
/**
@@ -74,7 +73,15 @@ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
7473

7574
@Override
7675
public String toString() {
77-
return "AfterProcessingTime.pastFirstElementInPane(" + timestampMappers + ")";
76+
StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
77+
for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
78+
builder
79+
.append(".plusDelayOf(")
80+
.append(delayFn)
81+
.append(")");
82+
}
83+
84+
return builder.toString();
7885
}
7986

8087
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
@Experimental(Experimental.Kind.TRIGGER)
6464
public class AfterWatermark {
6565

66+
private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
67+
6668
// Static factory class.
6769
private AfterWatermark() {}
6870

@@ -220,6 +222,26 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
220222
}
221223
}
222224

225+
@Override
226+
public String toString() {
227+
StringBuilder builder = new StringBuilder(TO_STRING);
228+
229+
Trigger earlyTrigger = subTriggers.get(EARLY_INDEX);
230+
if (!(earlyTrigger instanceof Never.NeverTrigger)) {
231+
builder
232+
.append(".withEarlyFirings(")
233+
.append(earlyTrigger)
234+
.append(")");
235+
}
236+
237+
builder
238+
.append(".withLateFirings(")
239+
.append(subTriggers.get(LATE_INDEX))
240+
.append(")");
241+
242+
return builder.toString();
243+
}
244+
223245
private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
224246
// We have not yet transitioned to late firings.
225247
ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
@@ -328,7 +350,7 @@ public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers
328350

329351
@Override
330352
public String toString() {
331-
return "AfterWatermark.pastEndOfWindow()";
353+
return TO_STRING;
332354
}
333355

334356
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.transforms.windowing;
1919

2020
import org.apache.beam.sdk.coders.Coder;
21+
import org.apache.beam.sdk.transforms.display.DisplayData;
2122

2223
import org.joda.time.DateTime;
2324
import org.joda.time.DateTimeZone;
@@ -35,14 +36,16 @@
3536
*/
3637
public class CalendarWindows {
3738

39+
private static final DateTime DEFAULT_START_DATE = new DateTime(0, DateTimeZone.UTC);
40+
3841
/**
3942
* Returns a {@link WindowFn} that windows elements into periods measured by days.
4043
*
4144
* <p>For example, {@code CalendarWindows.days(1)} will window elements into
4245
* separate windows for each day.
4346
*/
4447
public static DaysWindows days(int number) {
45-
return new DaysWindows(number, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
48+
return new DaysWindows(number, DEFAULT_START_DATE, DateTimeZone.UTC);
4649
}
4750

4851
/**
@@ -54,7 +57,7 @@ public static DaysWindows days(int number) {
5457
public static DaysWindows weeks(int number, int startDayOfWeek) {
5558
return new DaysWindows(
5659
7 * number,
57-
new DateTime(0, DateTimeZone.UTC).withDayOfWeek(startDayOfWeek),
60+
DEFAULT_START_DATE.withDayOfWeek(startDayOfWeek),
5861
DateTimeZone.UTC);
5962
}
6063

@@ -67,7 +70,7 @@ public static DaysWindows weeks(int number, int startDayOfWeek) {
6770
* and the first window begins in January 2014.
6871
*/
6972
public static MonthsWindows months(int number) {
70-
return new MonthsWindows(number, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
73+
return new MonthsWindows(number, 1, DEFAULT_START_DATE, DateTimeZone.UTC);
7174
}
7275

7376
/**
@@ -79,7 +82,7 @@ public static MonthsWindows months(int number) {
7982
* America/Los_Angeles time zone.
8083
*/
8184
public static YearsWindows years(int number) {
82-
return new YearsWindows(number, 1, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
85+
return new YearsWindows(number, 1, 1, DEFAULT_START_DATE, DateTimeZone.UTC);
8386
}
8487

8588
/**
@@ -142,6 +145,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
142145
&& timeZone == that.timeZone;
143146
}
144147

148+
@Override
149+
public void populateDisplayData(DisplayData.Builder builder) {
150+
builder
151+
.add("numDays", number)
152+
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
153+
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
154+
}
155+
145156
public int getNumber() {
146157
return number;
147158
}
@@ -229,6 +240,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
229240
&& timeZone == that.timeZone;
230241
}
231242

243+
@Override
244+
public void populateDisplayData(DisplayData.Builder builder) {
245+
builder
246+
.add("numMonths", number)
247+
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
248+
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
249+
}
250+
232251
public int getNumber() {
233252
return number;
234253
}
@@ -325,6 +344,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
325344
&& timeZone == that.timeZone;
326345
}
327346

347+
@Override
348+
public void populateDisplayData(DisplayData.Builder builder) {
349+
builder
350+
.add("numYears", number)
351+
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
352+
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
353+
}
354+
328355
public DateTimeZone getTimeZone() {
329356
return timeZone;
330357
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.transforms.windowing;
1919

2020
import org.apache.beam.sdk.coders.Coder;
21+
import org.apache.beam.sdk.transforms.display.DisplayData;
2122

2223
import org.joda.time.Duration;
2324
import org.joda.time.Instant;
@@ -82,6 +83,13 @@ public IntervalWindow assignWindow(Instant timestamp) {
8283
return new IntervalWindow(new Instant(start), size);
8384
}
8485

86+
@Override
87+
public void populateDisplayData(DisplayData.Builder builder) {
88+
builder
89+
.add("size", size)
90+
.addIfNotDefault("offset", offset, Duration.ZERO);
91+
}
92+
8593
@Override
8694
public Coder<IntervalWindow> windowCoder() {
8795
return IntervalWindow.getCoder();

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public static OnceTrigger ever() {
4141
return new NeverTrigger();
4242
}
4343

44-
private static class NeverTrigger extends OnceTrigger {
44+
// package-private in order to check identity for string formatting.
45+
static class NeverTrigger extends OnceTrigger {
4546
protected NeverTrigger() {
4647
super(null);
4748
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
9393
updateFinishedState(context);
9494
}
9595

96+
@Override
97+
public String toString() {
98+
return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
99+
}
100+
96101
private void updateFinishedState(TriggerContext c) throws Exception {
97102
boolean anyStillFinished = false;
98103
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {

0 commit comments

Comments
 (0)