Skip to content

Commit 380d473

Browse files
scwhittlelukecwik
andauthored
Enforce splitting invariants by ensuring split state is reset in the same synchronized block as window index increments. (#23882)
* Enforce splitting invariants by ensuring split state is reset in the same synchronized block as window index increments. Fixes #23881. * Add missing currentElement = null; Co-authored-by: Lukasz Cwik <[email protected]>
1 parent 8b3fd2e commit 380d473

1 file changed

Lines changed: 116 additions & 109 deletions

File tree

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 116 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -266,21 +266,27 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
266266
private Object currentKey;
267267

268268
/**
269-
* Only valud during {@link
269+
* Only valid during {@link
270270
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
271271
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
272272
*/
273273
private List<BoundedWindow> currentWindows;
274274

275275
/**
276-
* Only valud during {@link
276+
* The window index at which processing should stop. The window with this index should not be
277+
* processed.
278+
*
279+
* <p>Only valid during {@link
277280
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
278281
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
279282
*/
280283
private int windowStopIndex;
281284

282285
/**
283-
* Only valud during {@link
286+
* The window index which is currently being processed. This should always be less than
287+
* windowStopIndex.
288+
*
289+
* <p>Only valid during {@link
284290
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
285291
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
286292
*/
@@ -964,51 +970,49 @@ public void onClaimFailed(PositionT position) {}
964970
private void processElementForWindowObservingTruncateRestriction(
965971
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
966972
currentElement = elem.withValue(elem.getValue().getKey().getKey());
967-
try {
968-
windowCurrentIndex = -1;
969-
windowStopIndex = currentElement.getWindows().size();
970-
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
971-
while (true) {
972-
synchronized (splitLock) {
973-
windowCurrentIndex++;
974-
if (windowCurrentIndex >= windowStopIndex) {
975-
break;
976-
}
977-
currentRestriction = elem.getValue().getKey().getValue().getKey();
978-
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
979-
currentWindow = currentWindows.get(windowCurrentIndex);
980-
currentTracker =
981-
RestrictionTrackers.observe(
982-
doFnInvoker.invokeNewTracker(processContext),
983-
new ClaimObserver<PositionT>() {
984-
@Override
985-
public void onClaimed(PositionT position) {}
986-
987-
@Override
988-
public void onClaimFailed(PositionT position) {}
989-
});
990-
currentWatermarkEstimator =
991-
WatermarkEstimators.threadSafe(
992-
doFnInvoker.invokeNewWatermarkEstimator(processContext));
993-
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
994-
}
995-
TruncateResult<OutputT> truncatedRestriction =
996-
doFnInvoker.invokeTruncateRestriction(processContext);
997-
if (truncatedRestriction != null) {
998-
processContext.output(truncatedRestriction.getTruncatedRestriction());
973+
windowCurrentIndex = -1;
974+
windowStopIndex = currentElement.getWindows().size();
975+
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
976+
while (true) {
977+
synchronized (splitLock) {
978+
windowCurrentIndex++;
979+
if (windowCurrentIndex >= windowStopIndex) {
980+
// Careful to reset the split state under the same synchronized block.
981+
windowCurrentIndex = -1;
982+
windowStopIndex = 0;
983+
currentElement = null;
984+
currentWindows = null;
985+
currentRestriction = null;
986+
currentWatermarkEstimatorState = null;
987+
currentWindow = null;
988+
currentTracker = null;
989+
currentWatermarkEstimator = null;
990+
initialWatermark = null;
991+
break;
999992
}
993+
currentRestriction = elem.getValue().getKey().getValue().getKey();
994+
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
995+
currentWindow = currentWindows.get(windowCurrentIndex);
996+
currentTracker =
997+
RestrictionTrackers.observe(
998+
doFnInvoker.invokeNewTracker(processContext),
999+
new ClaimObserver<PositionT>() {
1000+
@Override
1001+
public void onClaimed(PositionT position) {}
1002+
1003+
@Override
1004+
public void onClaimFailed(PositionT position) {}
1005+
});
1006+
currentWatermarkEstimator =
1007+
WatermarkEstimators.threadSafe(doFnInvoker.invokeNewWatermarkEstimator(processContext));
1008+
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
1009+
}
1010+
TruncateResult<OutputT> truncatedRestriction =
1011+
doFnInvoker.invokeTruncateRestriction(processContext);
1012+
if (truncatedRestriction != null) {
1013+
processContext.output(truncatedRestriction.getTruncatedRestriction());
10001014
}
1001-
} finally {
1002-
currentTracker = null;
1003-
currentElement = null;
1004-
currentRestriction = null;
1005-
currentWatermarkEstimatorState = null;
1006-
currentWatermarkEstimator = null;
1007-
currentWindow = null;
1008-
currentWindows = null;
1009-
initialWatermark = null;
10101015
}
1011-
10121016
this.stateAccessor.finalizeState();
10131017
}
10141018

@@ -1058,72 +1062,69 @@ public static SplitResultsWithStopIndex of(
10581062
private void processElementForWindowObservingSizedElementAndRestriction(
10591063
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
10601064
currentElement = elem.withValue(elem.getValue().getKey().getKey());
1061-
try {
1062-
windowCurrentIndex = -1;
1063-
windowStopIndex = currentElement.getWindows().size();
1064-
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
1065-
while (true) {
1066-
synchronized (splitLock) {
1067-
windowCurrentIndex++;
1068-
if (windowCurrentIndex >= windowStopIndex) {
1069-
return;
1070-
}
1071-
currentRestriction = elem.getValue().getKey().getValue().getKey();
1072-
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
1073-
currentWindow = currentWindows.get(windowCurrentIndex);
1074-
currentTracker =
1075-
RestrictionTrackers.observe(
1076-
doFnInvoker.invokeNewTracker(processContext),
1077-
new ClaimObserver<PositionT>() {
1078-
@Override
1079-
public void onClaimed(PositionT position) {}
1080-
1081-
@Override
1082-
public void onClaimFailed(PositionT position) {}
1083-
});
1084-
currentWatermarkEstimator =
1085-
WatermarkEstimators.threadSafe(
1086-
doFnInvoker.invokeNewWatermarkEstimator(processContext));
1087-
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
1088-
}
1089-
1090-
// It is important to ensure that {@code splitLock} is not held during #invokeProcessElement
1091-
DoFn.ProcessContinuation continuation = doFnInvoker.invokeProcessElement(processContext);
1092-
// Ensure that all the work is done if the user tells us that they don't want to
1093-
// resume processing.
1094-
if (!continuation.shouldResume()) {
1095-
currentTracker.checkDone();
1096-
continue;
1065+
windowCurrentIndex = -1;
1066+
windowStopIndex = currentElement.getWindows().size();
1067+
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
1068+
while (true) {
1069+
synchronized (splitLock) {
1070+
windowCurrentIndex++;
1071+
if (windowCurrentIndex >= windowStopIndex) {
1072+
// Careful to reset the split state under the same synchronized block.
1073+
windowCurrentIndex = -1;
1074+
windowStopIndex = 0;
1075+
currentElement = null;
1076+
currentWindows = null;
1077+
currentRestriction = null;
1078+
currentWatermarkEstimatorState = null;
1079+
currentWindow = null;
1080+
currentTracker = null;
1081+
currentWatermarkEstimator = null;
1082+
initialWatermark = null;
1083+
return;
10971084
}
1085+
currentRestriction = elem.getValue().getKey().getValue().getKey();
1086+
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
1087+
currentWindow = currentWindows.get(windowCurrentIndex);
1088+
currentTracker =
1089+
RestrictionTrackers.observe(
1090+
doFnInvoker.invokeNewTracker(processContext),
1091+
new ClaimObserver<PositionT>() {
1092+
@Override
1093+
public void onClaimed(PositionT position) {}
10981094

1099-
// Attempt to checkpoint the current restriction.
1100-
HandlesSplits.SplitResult splitResult =
1101-
trySplitForElementAndRestriction(0, continuation.resumeDelay());
1102-
1103-
/**
1104-
* After the user has chosen to resume processing later, either the restriction is already
1105-
* done and the user unknowingly claimed the last element or the Runner may have stolen the
1106-
* remainder of work through a split call so the above trySplit may return null. If so, the
1107-
* current restriction must be done.
1108-
*/
1109-
if (splitResult == null) {
1110-
currentTracker.checkDone();
1111-
continue;
1112-
}
1113-
// Forward the split to the bundle level split listener.
1114-
splitListener.split(splitResult.getPrimaryRoots(), splitResult.getResidualRoots());
1115-
}
1116-
} finally {
1117-
synchronized (splitLock) {
1118-
currentElement = null;
1119-
currentRestriction = null;
1120-
currentWatermarkEstimatorState = null;
1121-
currentWindow = null;
1122-
currentTracker = null;
1123-
currentWatermarkEstimator = null;
1124-
currentWindows = null;
1125-
initialWatermark = null;
1095+
@Override
1096+
public void onClaimFailed(PositionT position) {}
1097+
});
1098+
currentWatermarkEstimator =
1099+
WatermarkEstimators.threadSafe(doFnInvoker.invokeNewWatermarkEstimator(processContext));
1100+
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
1101+
}
1102+
1103+
// It is important to ensure that {@code splitLock} is not held during #invokeProcessElement
1104+
DoFn.ProcessContinuation continuation = doFnInvoker.invokeProcessElement(processContext);
1105+
// Ensure that all the work is done if the user tells us that they don't want to
1106+
// resume processing.
1107+
if (!continuation.shouldResume()) {
1108+
currentTracker.checkDone();
1109+
continue;
1110+
}
1111+
1112+
// Attempt to checkpoint the current restriction.
1113+
HandlesSplits.SplitResult splitResult =
1114+
trySplitForElementAndRestriction(0, continuation.resumeDelay());
1115+
1116+
/**
1117+
* After the user has chosen to resume processing later, either the restriction is already
1118+
* done and the user unknowingly claimed the last element or the Runner may have stolen the
1119+
* remainder of work through a split call so the above trySplit may return null. If so, the
1120+
* current restriction must be done.
1121+
*/
1122+
if (splitResult == null) {
1123+
currentTracker.checkDone();
1124+
continue;
11261125
}
1126+
// Forward the split to the bundle level split listener.
1127+
splitListener.split(splitResult.getPrimaryRoots(), splitResult.getResidualRoots());
11271128
}
11281129
}
11291130

@@ -1153,7 +1154,7 @@ public double getProgress() {
11531154

11541155
private Progress getProgress() {
11551156
synchronized (splitLock) {
1156-
if (currentTracker instanceof RestrictionTracker.HasProgress) {
1157+
if (currentTracker instanceof RestrictionTracker.HasProgress && currentWindow != null) {
11571158
return scaleProgress(
11581159
((HasProgress) currentTracker).getProgress(), windowCurrentIndex, windowStopIndex);
11591160
}
@@ -1175,6 +1176,12 @@ private Progress getProgressFromWindowObservingTruncate(double elementCompleted)
11751176

11761177
@VisibleForTesting
11771178
static Progress scaleProgress(Progress progress, int currentWindowIndex, int stopWindowIndex) {
1179+
checkArgument(
1180+
currentWindowIndex < stopWindowIndex,
1181+
"Current window index (%s) must be less than stop window index (%s)",
1182+
currentWindowIndex,
1183+
stopWindowIndex);
1184+
11781185
double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining();
11791186
double completed = totalWorkPerWindow * currentWindowIndex + progress.getWorkCompleted();
11801187
double remaining =

0 commit comments

Comments
 (0)