Skip to content

Commit edc4766

Browse files
authored
[Dataflow Streaming Appliance] Fix per key commit size validation (apache#33597)
1 parent 71df963 commit edc4766

2 files changed

Lines changed: 74 additions & 3 deletions

File tree

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
24+
25+
import org.apache.beam.sdk.testing.PAssert;
26+
import org.apache.beam.sdk.testing.TestPipeline;
27+
import org.apache.beam.sdk.testing.ValidatesRunner;
28+
import org.apache.beam.sdk.transforms.Create;
29+
import org.apache.beam.sdk.transforms.GroupByKey;
30+
import org.apache.beam.sdk.values.KV;
31+
import org.apache.beam.sdk.values.PCollection;
32+
import org.hamcrest.Matchers;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.experimental.categories.Category;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.JUnit4;
38+
39+
@RunWith(JUnit4.class)
40+
public class LargeCommitTest {
41+
42+
@Rule public transient TestPipeline p = TestPipeline.create();
43+
44+
@Test
45+
@Category({ValidatesRunner.class})
46+
public void testLargeCommit() {
47+
// 5 50MB values shuffling to a single key
48+
String value = bigString('a', 50 << 20);
49+
KV<String, String> kv = KV.of("a", value);
50+
PCollection<KV<String, Iterable<String>>> result =
51+
p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create());
52+
53+
PAssert.that(result)
54+
.satisfies(
55+
kvs -> {
56+
assertTrue(kvs.iterator().hasNext());
57+
KV<String, Iterable<String>> outputKV = kvs.iterator().next();
58+
assertFalse(kvs.iterator().hasNext());
59+
assertEquals("a", outputKV.getKey());
60+
assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value));
61+
return null;
62+
});
63+
p.run();
64+
}
65+
66+
private static String bigString(char c, int size) {
67+
char[] buf = new char[size];
68+
for (int i = 0; i < size; i++) {
69+
buf[i] = c;
70+
}
71+
return new String(buf);
72+
}
73+
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
@Internal
2626
public abstract class OperationalLimits {
2727

28-
private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
29-
3028
// Maximum size of a commit from a single work item.
3129
public abstract long getMaxWorkItemCommitBytes();
3230
// Maximum size of a single output element's serialized key.
@@ -48,7 +46,7 @@ public abstract static class Builder {
4846

4947
public static OperationalLimits.Builder builder() {
5048
return new AutoValue_OperationalLimits.Builder()
51-
.setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
49+
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
5250
.setMaxOutputKeyBytes(Long.MAX_VALUE)
5351
.setMaxOutputValueBytes(Long.MAX_VALUE);
5452
}

0 commit comments

Comments
 (0)