@@ -266,21 +266,27 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
266266 private Object currentKey ;
267267
268268 /**
269- * Only valud during {@link
269+ * Only valid during {@link
270270 * #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
271271 * #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
272272 */
273273 private List <BoundedWindow > currentWindows ;
274274
275275 /**
276- * Only valud during {@link
276+ * The window index at which processing should stop. The window with this index should not be
277+ * processed.
278+ *
279+ * <p>Only valid during {@link
277280 * #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
278281 * #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
279282 */
280283 private int windowStopIndex ;
281284
282285 /**
283- * Only valud during {@link
286+ * The window index which is currently being processed. This should always be less than
287+ * windowStopIndex.
288+ *
289+ * <p>Only valid during {@link
284290 * #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
285291 * #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
286292 */
@@ -964,51 +970,49 @@ public void onClaimFailed(PositionT position) {}
964970 private void processElementForWindowObservingTruncateRestriction (
965971 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
966972 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
967- try {
968- windowCurrentIndex = -1 ;
969- windowStopIndex = currentElement .getWindows ().size ();
970- currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
971- while (true ) {
972- synchronized (splitLock ) {
973- windowCurrentIndex ++;
974- if (windowCurrentIndex >= windowStopIndex ) {
975- break ;
976- }
977- currentRestriction = elem .getValue ().getKey ().getValue ().getKey ();
978- currentWatermarkEstimatorState = elem .getValue ().getKey ().getValue ().getValue ();
979- currentWindow = currentWindows .get (windowCurrentIndex );
980- currentTracker =
981- RestrictionTrackers .observe (
982- doFnInvoker .invokeNewTracker (processContext ),
983- new ClaimObserver <PositionT >() {
984- @ Override
985- public void onClaimed (PositionT position ) {}
986-
987- @ Override
988- public void onClaimFailed (PositionT position ) {}
989- });
990- currentWatermarkEstimator =
991- WatermarkEstimators .threadSafe (
992- doFnInvoker .invokeNewWatermarkEstimator (processContext ));
993- initialWatermark = currentWatermarkEstimator .getWatermarkAndState ().getKey ();
994- }
995- TruncateResult <OutputT > truncatedRestriction =
996- doFnInvoker .invokeTruncateRestriction (processContext );
997- if (truncatedRestriction != null ) {
998- processContext .output (truncatedRestriction .getTruncatedRestriction ());
973+ windowCurrentIndex = -1 ;
974+ windowStopIndex = currentElement .getWindows ().size ();
975+ currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
976+ while (true ) {
977+ synchronized (splitLock ) {
978+ windowCurrentIndex ++;
979+ if (windowCurrentIndex >= windowStopIndex ) {
980+ // Careful to reset the split state under the same synchronized block.
981+ windowCurrentIndex = -1 ;
982+ windowStopIndex = 0 ;
983+ currentElement = null ;
984+ currentWindows = null ;
985+ currentRestriction = null ;
986+ currentWatermarkEstimatorState = null ;
987+ currentWindow = null ;
988+ currentTracker = null ;
989+ currentWatermarkEstimator = null ;
990+ initialWatermark = null ;
991+ break ;
999992 }
993+ currentRestriction = elem .getValue ().getKey ().getValue ().getKey ();
994+ currentWatermarkEstimatorState = elem .getValue ().getKey ().getValue ().getValue ();
995+ currentWindow = currentWindows .get (windowCurrentIndex );
996+ currentTracker =
997+ RestrictionTrackers .observe (
998+ doFnInvoker .invokeNewTracker (processContext ),
999+ new ClaimObserver <PositionT >() {
1000+ @ Override
1001+ public void onClaimed (PositionT position ) {}
1002+
1003+ @ Override
1004+ public void onClaimFailed (PositionT position ) {}
1005+ });
1006+ currentWatermarkEstimator =
1007+ WatermarkEstimators .threadSafe (doFnInvoker .invokeNewWatermarkEstimator (processContext ));
1008+ initialWatermark = currentWatermarkEstimator .getWatermarkAndState ().getKey ();
1009+ }
1010+ TruncateResult <OutputT > truncatedRestriction =
1011+ doFnInvoker .invokeTruncateRestriction (processContext );
1012+ if (truncatedRestriction != null ) {
1013+ processContext .output (truncatedRestriction .getTruncatedRestriction ());
10001014 }
1001- } finally {
1002- currentTracker = null ;
1003- currentElement = null ;
1004- currentRestriction = null ;
1005- currentWatermarkEstimatorState = null ;
1006- currentWatermarkEstimator = null ;
1007- currentWindow = null ;
1008- currentWindows = null ;
1009- initialWatermark = null ;
10101015 }
1011-
10121016 this .stateAccessor .finalizeState ();
10131017 }
10141018
@@ -1058,72 +1062,69 @@ public static SplitResultsWithStopIndex of(
10581062 private void processElementForWindowObservingSizedElementAndRestriction (
10591063 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
10601064 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
1061- try {
1062- windowCurrentIndex = -1 ;
1063- windowStopIndex = currentElement .getWindows ().size ();
1064- currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
1065- while (true ) {
1066- synchronized (splitLock ) {
1067- windowCurrentIndex ++;
1068- if (windowCurrentIndex >= windowStopIndex ) {
1069- return ;
1070- }
1071- currentRestriction = elem .getValue ().getKey ().getValue ().getKey ();
1072- currentWatermarkEstimatorState = elem .getValue ().getKey ().getValue ().getValue ();
1073- currentWindow = currentWindows .get (windowCurrentIndex );
1074- currentTracker =
1075- RestrictionTrackers .observe (
1076- doFnInvoker .invokeNewTracker (processContext ),
1077- new ClaimObserver <PositionT >() {
1078- @ Override
1079- public void onClaimed (PositionT position ) {}
1080-
1081- @ Override
1082- public void onClaimFailed (PositionT position ) {}
1083- });
1084- currentWatermarkEstimator =
1085- WatermarkEstimators .threadSafe (
1086- doFnInvoker .invokeNewWatermarkEstimator (processContext ));
1087- initialWatermark = currentWatermarkEstimator .getWatermarkAndState ().getKey ();
1088- }
1089-
1090- // It is important to ensure that {@code splitLock} is not held during #invokeProcessElement
1091- DoFn .ProcessContinuation continuation = doFnInvoker .invokeProcessElement (processContext );
1092- // Ensure that all the work is done if the user tells us that they don't want to
1093- // resume processing.
1094- if (!continuation .shouldResume ()) {
1095- currentTracker .checkDone ();
1096- continue ;
1065+ windowCurrentIndex = -1 ;
1066+ windowStopIndex = currentElement .getWindows ().size ();
1067+ currentWindows = ImmutableList .copyOf (currentElement .getWindows ());
1068+ while (true ) {
1069+ synchronized (splitLock ) {
1070+ windowCurrentIndex ++;
1071+ if (windowCurrentIndex >= windowStopIndex ) {
1072+ // Careful to reset the split state under the same synchronized block.
1073+ windowCurrentIndex = -1 ;
1074+ windowStopIndex = 0 ;
1075+ currentElement = null ;
1076+ currentWindows = null ;
1077+ currentRestriction = null ;
1078+ currentWatermarkEstimatorState = null ;
1079+ currentWindow = null ;
1080+ currentTracker = null ;
1081+ currentWatermarkEstimator = null ;
1082+ initialWatermark = null ;
1083+ return ;
10971084 }
1085+ currentRestriction = elem .getValue ().getKey ().getValue ().getKey ();
1086+ currentWatermarkEstimatorState = elem .getValue ().getKey ().getValue ().getValue ();
1087+ currentWindow = currentWindows .get (windowCurrentIndex );
1088+ currentTracker =
1089+ RestrictionTrackers .observe (
1090+ doFnInvoker .invokeNewTracker (processContext ),
1091+ new ClaimObserver <PositionT >() {
1092+ @ Override
1093+ public void onClaimed (PositionT position ) {}
10981094
1099- // Attempt to checkpoint the current restriction.
1100- HandlesSplits .SplitResult splitResult =
1101- trySplitForElementAndRestriction (0 , continuation .resumeDelay ());
1102-
1103- /**
1104- * After the user has chosen to resume processing later, either the restriction is already
1105- * done and the user unknowingly claimed the last element or the Runner may have stolen the
1106- * remainder of work through a split call so the above trySplit may return null. If so, the
1107- * current restriction must be done.
1108- */
1109- if (splitResult == null ) {
1110- currentTracker .checkDone ();
1111- continue ;
1112- }
1113- // Forward the split to the bundle level split listener.
1114- splitListener .split (splitResult .getPrimaryRoots (), splitResult .getResidualRoots ());
1115- }
1116- } finally {
1117- synchronized (splitLock ) {
1118- currentElement = null ;
1119- currentRestriction = null ;
1120- currentWatermarkEstimatorState = null ;
1121- currentWindow = null ;
1122- currentTracker = null ;
1123- currentWatermarkEstimator = null ;
1124- currentWindows = null ;
1125- initialWatermark = null ;
1095+ @ Override
1096+ public void onClaimFailed (PositionT position ) {}
1097+ });
1098+ currentWatermarkEstimator =
1099+ WatermarkEstimators .threadSafe (doFnInvoker .invokeNewWatermarkEstimator (processContext ));
1100+ initialWatermark = currentWatermarkEstimator .getWatermarkAndState ().getKey ();
1101+ }
1102+
1103+ // It is important to ensure that {@code splitLock} is not held during #invokeProcessElement
1104+ DoFn .ProcessContinuation continuation = doFnInvoker .invokeProcessElement (processContext );
1105+ // Ensure that all the work is done if the user tells us that they don't want to
1106+ // resume processing.
1107+ if (!continuation .shouldResume ()) {
1108+ currentTracker .checkDone ();
1109+ continue ;
1110+ }
1111+
1112+ // Attempt to checkpoint the current restriction.
1113+ HandlesSplits .SplitResult splitResult =
1114+ trySplitForElementAndRestriction (0 , continuation .resumeDelay ());
1115+
1116+ /**
1117+ * After the user has chosen to resume processing later, either the restriction is already
1118+ * done and the user unknowingly claimed the last element or the Runner may have stolen the
1119+ * remainder of work through a split call so the above trySplit may return null. If so, the
1120+ * current restriction must be done.
1121+ */
1122+ if (splitResult == null ) {
1123+ currentTracker .checkDone ();
1124+ continue ;
11261125 }
1126+ // Forward the split to the bundle level split listener.
1127+ splitListener .split (splitResult .getPrimaryRoots (), splitResult .getResidualRoots ());
11271128 }
11281129 }
11291130
@@ -1153,7 +1154,7 @@ public double getProgress() {
11531154
11541155 private Progress getProgress () {
11551156 synchronized (splitLock ) {
1156- if (currentTracker instanceof RestrictionTracker .HasProgress ) {
1157+ if (currentTracker instanceof RestrictionTracker .HasProgress && currentWindow != null ) {
11571158 return scaleProgress (
11581159 ((HasProgress ) currentTracker ).getProgress (), windowCurrentIndex , windowStopIndex );
11591160 }
@@ -1175,6 +1176,12 @@ private Progress getProgressFromWindowObservingTruncate(double elementCompleted)
11751176
11761177 @ VisibleForTesting
11771178 static Progress scaleProgress (Progress progress , int currentWindowIndex , int stopWindowIndex ) {
1179+ checkArgument (
1180+ currentWindowIndex < stopWindowIndex ,
1181+ "Current window index (%s) must be less than stop window index (%s)" ,
1182+ currentWindowIndex ,
1183+ stopWindowIndex );
1184+
11781185 double totalWorkPerWindow = progress .getWorkCompleted () + progress .getWorkRemaining ();
11791186 double completed = totalWorkPerWindow * currentWindowIndex + progress .getWorkCompleted ();
11801187 double remaining =
0 commit comments