Skip to content

Commit ea477dd

Browse files
damccormclmccart
andauthored
getProcessingTimesByStep returns a modifiable map (#29786) (#29799)
* getProcessingTimesByStep returns a modifiable map * return copy in getProcessingTimesByStep and update name accordingly * Spotless --------- Co-authored-by: clmccart <[email protected]> Co-authored-by: Claire McCarthy <[email protected]>
1 parent 667027d commit ea477dd

File tree

4 files changed

+12
-10
lines changed

4 files changed

+12
-10
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,9 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
341341
return Optional.ofNullable(activeMessageMetadata);
342342
}
343343

344-
public Map<String, IntSummaryStatistics> getProcessingTimesByStep() {
345-
return Collections.unmodifiableMap(processingTimesByStep);
344+
public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
345+
Map<String, IntSummaryStatistics> processingTimesCopy = processingTimesByStep;
346+
return processingTimesCopy;
346347
}
347348

348349
/**

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void removeTracker(ExecutionStateTracker tracker) {
8686
return;
8787
}
8888
DataflowExecutionStateTracker dfTracker = (DataflowExecutionStateTracker) tracker;
89-
completedProcessingMetrics.put(dfTracker.getWorkItemId(), dfTracker.getProcessingTimesByStep());
89+
completedProcessingMetrics.put(
90+
dfTracker.getWorkItemId(), dfTracker.getProcessingTimesByStepCopy());
9091
activeTrackersByWorkId.remove(dfTracker.getWorkItemId());
9192

9293
// Attribute any remaining time since the last sampling while removing the tracker.
@@ -126,7 +127,7 @@ public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(Str
126127
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
127128
return mergeStepStatsMaps(
128129
completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
129-
tracker.getProcessingTimesByStep());
130+
tracker.getProcessingTimesByStepCopy());
130131
}
131132

132133
public void resetForWorkId(String workId) {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes()
176176
tracker.enterState(newState);
177177

178178
// The first completed state should be recorded and the new state should be active.
179-
Map<String, IntSummaryStatistics> gotProcessingTimes = tracker.getProcessingTimesByStep();
179+
Map<String, IntSummaryStatistics> gotProcessingTimes = tracker.getProcessingTimesByStepCopy();
180180
Assert.assertEquals(1, gotProcessingTimes.size());
181181
Assert.assertEquals(
182182
new HashSet<>(Arrays.asList(NameContextsForTests.nameContextForTest().userName())),

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testRemoveTrackerCompletedProcessingTimesGetsUpdated() {
7676
Map<String, IntSummaryStatistics> testCompletedProcessingTimes = new HashMap<>();
7777
testCompletedProcessingTimes.put("some-step", new IntSummaryStatistics());
7878
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
79-
when(trackerMock.getProcessingTimesByStep()).thenReturn(testCompletedProcessingTimes);
79+
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);
8080

8181
sampler.addTracker(trackerMock);
8282
sampler.removeTracker(trackerMock);
@@ -98,7 +98,7 @@ public void testGetCompletedProcessingTimesAndActiveMessageFromActiveTracker() {
9898
ActiveMessageMetadata.create(step1act1.getStepName().userName(), clock.getMillis());
9999
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
100100
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
101-
when(trackerMock.getProcessingTimesByStep()).thenReturn(testCompletedProcessingTimes);
101+
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);
102102

103103
sampler.addTracker(trackerMock);
104104

@@ -122,21 +122,21 @@ public void testResetForWorkIdClearsMaps() {
122122
equalTo(tracker1Mock.getActiveMessageMetadata()));
123123
assertThat(
124124
sampler.getProcessingDistributionsForWorkId(workId1),
125-
equalTo(tracker1Mock.getProcessingTimesByStep()));
125+
equalTo(tracker1Mock.getProcessingTimesByStepCopy()));
126126
assertThat(
127127
sampler.getActiveMessageMetadataForWorkId(workId2),
128128
equalTo(tracker2Mock.getActiveMessageMetadata()));
129129
assertThat(
130130
sampler.getProcessingDistributionsForWorkId(workId2),
131-
equalTo(tracker2Mock.getProcessingTimesByStep()));
131+
equalTo(tracker2Mock.getProcessingTimesByStepCopy()));
132132

133133
sampler.removeTracker(tracker1Mock);
134134
sampler.removeTracker(tracker2Mock);
135135
sampler.resetForWorkId(workId2);
136136

137137
assertThat(
138138
sampler.getProcessingDistributionsForWorkId(workId1),
139-
equalTo(tracker1Mock.getProcessingTimesByStep()));
139+
equalTo(tracker1Mock.getProcessingTimesByStepCopy()));
140140
Assert.assertTrue(sampler.getProcessingDistributionsForWorkId(workId2).isEmpty());
141141
}
142142

0 commit comments

Comments
 (0)