Skip to content

Commit 30a48f0

Browse files
authored
Improve the performance of TextSource by reducing how many byte[]s are copied (fixes #23193) (#23196)
* Improve the performance of TextSource by reducing how many byte[]s are copied (fixes #23193) This makes TextSource take about 2.3x less CPU resources during decoding. Before this change: ``` TextSourceBenchmark.benchmarkTextSource thrpt 5 0.248 ± 0.029 ops/s ``` After this change: ``` TextSourceBenchmark.benchmarkHadoopLineReader thrpt 5 0.465 ± 0.064 ops/s TextSourceBenchmark.benchmarkTextSource thrpt 5 0.575 ± 0.059 ops/s ``` * Write file in pieces instead of pre-allocating entire buffer * Address PR comments
1 parent 94405e6 commit 30a48f0

6 files changed

Lines changed: 435 additions & 133 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
## I/Os
6060

6161
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
62+
* Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)).
6263

6364
## New Features / Improvements
6465

sdks/java/core/jmh/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"
2828

2929
dependencies {
3030
implementation project(path: ":sdks:java:core", configuration: "shadow")
31+
implementation project(path: ":sdks:java:core", configuration: "shadowTest")
3132
implementation library.java.joda_time
3233
implementation library.java.vendored_grpc_1_48_1
3334
implementation library.java.vendored_guava_26_0_jre
35+
implementation library.java.hadoop_common
3436
runtimeOnly library.java.slf4j_jdk14
3537
testImplementation library.java.junit
3638
testImplementation library.java.hamcrest
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.jmh.io;
19+
20+
import java.io.BufferedWriter;
21+
import java.io.FileInputStream;
22+
import java.nio.charset.StandardCharsets;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.util.Arrays;
26+
import java.util.concurrent.ThreadLocalRandom;
27+
import org.apache.beam.sdk.io.FileBasedSource;
28+
import org.apache.beam.sdk.io.Source;
29+
import org.apache.beam.sdk.io.TextIOReadTest;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.hadoop.io.Text;
32+
import org.apache.hadoop.util.LineReader;
33+
import org.openjdk.jmh.annotations.Benchmark;
34+
import org.openjdk.jmh.annotations.Scope;
35+
import org.openjdk.jmh.annotations.Setup;
36+
import org.openjdk.jmh.annotations.State;
37+
import org.openjdk.jmh.annotations.TearDown;
38+
39+
public class TextSourceBenchmark {
40+
private static final int NUM_LINES = 10_000_000;
41+
private static char[] data = new char[120];
42+
43+
static {
44+
Arrays.fill(data, 'a');
45+
}
46+
47+
@State(Scope.Benchmark)
48+
public static class Data {
49+
public Path path;
50+
public String pathString;
51+
public int length;
52+
53+
/** Generates a random file with {@code NUM_LINES} between 60 and 120 characters each. */
54+
@Setup
55+
public void createFile() throws Exception {
56+
path = Files.createTempFile("benchmark", null).toAbsolutePath();
57+
pathString = path.toString();
58+
BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8);
59+
for (int i = 0; i < NUM_LINES; ++i) {
60+
String valueToAppend =
61+
String.valueOf(data, 0, ThreadLocalRandom.current().nextInt(60, 120));
62+
length += valueToAppend.length();
63+
writer.write(valueToAppend);
64+
writer.write('\n');
65+
}
66+
writer.close();
67+
}
68+
69+
@TearDown
70+
public void deleteFile() throws Exception {
71+
Files.deleteIfExists(path);
72+
}
73+
}
74+
75+
@Benchmark
76+
public void benchmarkTextSource(Data data) throws Exception {
77+
Source.Reader<String> reader =
78+
((FileBasedSource<String>) TextIOReadTest.getTextSource(data.pathString, null))
79+
.createReader(PipelineOptionsFactory.create());
80+
int length = 0;
81+
int linesRead = 0;
82+
if (reader.start()) {
83+
linesRead += 1;
84+
length += reader.getCurrent().length();
85+
}
86+
while (reader.advance()) {
87+
linesRead += 1;
88+
length += reader.getCurrent().length();
89+
}
90+
if (linesRead != NUM_LINES) {
91+
throw new IllegalStateException();
92+
}
93+
if (length != data.length) {
94+
throw new IllegalStateException();
95+
}
96+
reader.close();
97+
}
98+
99+
@Benchmark
100+
public void benchmarkHadoopLineReader(Data data) throws Exception {
101+
LineReader reader = new LineReader(new FileInputStream(data.pathString));
102+
int length = 0;
103+
int linesRead = 0;
104+
do {
105+
Text text = new Text();
106+
reader.readLine(text);
107+
// It is important to convert toString() here so that we force the decoding to UTF8 otherwise
108+
// Text keeps the encoded byte[] version in memory.
109+
length += text.toString().length();
110+
linesRead += 1;
111+
} while (length < data.length);
112+
if (linesRead != NUM_LINES) {
113+
throw new IllegalStateException();
114+
}
115+
reader.close();
116+
}
117+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/** Benchmarks for IO. */
20+
package org.apache.beam.sdk.jmh.io;

0 commit comments

Comments
 (0)