Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b93679c

Browse files
authored
Merge cdbeaf3 into fc16183
2 parents fc16183 + cdbeaf3 commit b93679c

2 files changed

Lines changed: 291 additions & 11 deletions

File tree

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerystorage;
18+
19+
// [START bigquerystorage_jsonstreamwriter_parallelcommitted]
20+
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.ApiFutureCallback;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
24+
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
25+
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
26+
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
27+
import com.google.cloud.bigquery.storage.v1beta2.TableName;
28+
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
29+
import com.google.common.util.concurrent.MoreExecutors;
30+
import com.google.protobuf.Descriptors.DescriptorValidationException;
31+
import java.io.IOException;
32+
import java.time.Duration;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import java.util.logging.Logger;
35+
import javax.annotation.Nullable;
36+
import javax.annotation.concurrent.GuardedBy;
37+
import org.json.JSONArray;
38+
import org.json.JSONObject;
39+
40+
public class ParallelWriteCommittedStream {
41+
42+
private static final Logger LOG = Logger.getLogger(ParallelWriteCommittedStream.class.getName());
43+
44+
// Total amount of test time.
45+
private static final Duration TEST_TIME = Duration.ofSeconds(10);
46+
47+
// How often to publish append stats.
48+
private static final Duration METRICS_GAP = Duration.ofSeconds(5);
49+
50+
// Size of each row to append.
51+
private static final int ROW_SIZE = 1024;
52+
53+
// The number of rows in each append request.
54+
private static final long BATCH_SIZE = 10;
55+
56+
// If true, switch to a new stream when append fails.
57+
// If false, do not switch to a new stream.
58+
private static final boolean SUPPORT_STREAM_SWITCH = false;
59+
60+
@GuardedBy("this")
61+
private long inflightCount = 0;
62+
63+
@GuardedBy("this")
64+
private long successCount = 0;
65+
66+
@GuardedBy("this")
67+
private long failureCount = 0;
68+
69+
@GuardedBy("this")
70+
private Throwable error = null;
71+
72+
@GuardedBy("this")
73+
private long lastMetricsTimeMillis = 0;
74+
75+
@GuardedBy("this")
76+
private long lastMetricsSuccessCount = 0;
77+
78+
@GuardedBy("this")
79+
private long lastMetricsFailureCount = 0;
80+
81+
public void writeLoop(
82+
String projectId, String datasetName, String tableName, BigQueryWriteClient client) {
83+
LOG.info("Start writeLoop");
84+
long streamSwitchCount = 0;
85+
long successRowCount = 0;
86+
long failureRowCount = 0;
87+
Throwable loggedError = null;
88+
long deadlineMillis = System.currentTimeMillis() + TEST_TIME.toMillis();
89+
while (System.currentTimeMillis() < deadlineMillis) {
90+
try {
91+
WriteStream writeStream = createStream(projectId, datasetName, tableName, client);
92+
writeToStream(client, writeStream, deadlineMillis);
93+
} catch (Throwable e) {
94+
LOG.warning("Unexpected error writing to stream: " + e.toString());
95+
}
96+
waitForInflightToReachZero(Duration.ofMinutes(1));
97+
synchronized (this) {
98+
successRowCount += successCount * BATCH_SIZE;
99+
failureRowCount += failureCount * BATCH_SIZE;
100+
if (loggedError == null) {
101+
loggedError = error;
102+
}
103+
}
104+
if (!SUPPORT_STREAM_SWITCH) {
105+
// If stream switch is disabled, break.
106+
break;
107+
}
108+
LOG.info("Sleeping before switching stream.");
109+
sleepIgnoringInterruption(Duration.ofMinutes(1));
110+
streamSwitchCount++;
111+
}
112+
LOG.info(
113+
"Finish writeLoop. Success row count: "
114+
+ successRowCount
115+
+ " Failure row count: "
116+
+ failureRowCount
117+
+ " Logged error: "
118+
+ loggedError
119+
+ " Stream switch count: "
120+
+ streamSwitchCount);
121+
if (successRowCount > 0 && failureRowCount == 0 && loggedError == null) {
122+
System.out.println("All records are appended successfully.");
123+
}
124+
}
125+
126+
private WriteStream createStream(
127+
String projectId, String datasetName, String tableName, BigQueryWriteClient client) {
128+
LOG.info("Creating a new stream");
129+
// Initialize a write stream for the specified table.
130+
// For more information on WriteStream.Type, see:
131+
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
132+
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
133+
TableName parentTable = TableName.of(projectId, datasetName, tableName);
134+
CreateWriteStreamRequest createWriteStreamRequest =
135+
CreateWriteStreamRequest.newBuilder()
136+
.setParent(parentTable.toString())
137+
.setWriteStream(stream)
138+
.build();
139+
return client.createWriteStream(createWriteStreamRequest);
140+
}
141+
142+
private void writeToStream(
143+
BigQueryWriteClient client, WriteStream writeStream, long deadlineMillis) throws Throwable {
144+
LOG.info("Start writing to new stream:" + writeStream.getName());
145+
synchronized (this) {
146+
inflightCount = 0;
147+
successCount = 0;
148+
failureCount = 0;
149+
error = null;
150+
lastMetricsTimeMillis = System.currentTimeMillis();
151+
lastMetricsSuccessCount = 0;
152+
lastMetricsFailureCount = 0;
153+
}
154+
// Use the JSON stream writer to send records in JSON format.
155+
// For more information about JsonStreamWriter, see:
156+
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
157+
try (JsonStreamWriter writer =
158+
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
159+
.build()) {
160+
while (System.currentTimeMillis() < deadlineMillis) {
161+
synchronized (this) {
162+
if (error != null) {
163+
// Stop writing once we get an error.
164+
throw error;
165+
}
166+
}
167+
ApiFuture<AppendRowsResponse> future = writer.append(createPayload(), -1);
168+
synchronized (this) {
169+
inflightCount++;
170+
}
171+
ApiFutures.addCallback(
172+
future, new AppendCompleteCallback(this), MoreExecutors.directExecutor());
173+
}
174+
}
175+
}
176+
177+
private void waitForInflightToReachZero(Duration timeout) {
178+
LOG.info("Waiting for inflight count to reach zero.");
179+
long deadlineMillis = System.currentTimeMillis() + timeout.toMillis();
180+
while (System.currentTimeMillis() < deadlineMillis) {
181+
synchronized (this) {
182+
if (inflightCount == 0) {
183+
LOG.info("Inflight count has reached zero.");
184+
return;
185+
}
186+
}
187+
sleepIgnoringInterruption(Duration.ofSeconds(1));
188+
}
189+
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
190+
}
191+
192+
private JSONArray createPayload() {
193+
// Create a JSON object that is compatible with the table schema.
194+
JSONArray jsonArr = new JSONArray();
195+
for (int i = 0; i < BATCH_SIZE; i++) {
196+
byte[] payload = new byte[ROW_SIZE];
197+
ThreadLocalRandom.current().nextBytes(payload);
198+
JSONObject record = new JSONObject();
199+
record.put("col1", new String(payload));
200+
jsonArr.put(record);
201+
}
202+
return jsonArr;
203+
}
204+
205+
private void sleepIgnoringInterruption(Duration duration) {
206+
try {
207+
Thread.sleep(duration.toMillis());
208+
} catch (InterruptedException e) {
209+
LOG.warning("Sleep is interrupted.");
210+
}
211+
}
212+
213+
/*
214+
* Callback when Append request is completed.
215+
*
216+
* It keeps track of count.
217+
*/
218+
private class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
219+
220+
private final ParallelWriteCommittedStream parent;
221+
222+
AppendCompleteCallback(ParallelWriteCommittedStream parent) {
223+
this.parent = parent;
224+
}
225+
226+
@Override
227+
public void onSuccess(@Nullable AppendRowsResponse response) {
228+
synchronized (parent) {
229+
parent.inflightCount--;
230+
if (!response.hasError()) {
231+
parent.successCount++;
232+
} else {
233+
parent.failureCount++;
234+
}
235+
long nowMillis = System.currentTimeMillis();
236+
if (nowMillis >= parent.lastMetricsTimeMillis + METRICS_GAP.toMillis()) {
237+
long successCountInIteration = parent.successCount - parent.lastMetricsSuccessCount;
238+
long failureCountInIteration = parent.failureCount - parent.lastMetricsFailureCount;
239+
long metricsTimeMillis = nowMillis - parent.lastMetricsTimeMillis;
240+
LOG.info(
241+
"Success append: "
242+
+ successCountInIteration
243+
+ " failure append: "
244+
+ failureCountInIteration
245+
+ " in "
246+
+ metricsTimeMillis
247+
+ "ms. Successful MB Per Second: "
248+
+ (double) (successCountInIteration * BATCH_SIZE * ROW_SIZE)
249+
/ metricsTimeMillis
250+
/ 1000
251+
+ " Current inflight: "
252+
+ parent.inflightCount);
253+
parent.lastMetricsTimeMillis = System.currentTimeMillis();
254+
parent.lastMetricsSuccessCount = parent.successCount;
255+
parent.lastMetricsFailureCount = parent.failureCount;
256+
}
257+
}
258+
}
259+
260+
@Override
261+
public void onFailure(Throwable throwable) {
262+
synchronized (parent) {
263+
parent.inflightCount--;
264+
parent.error = throwable;
265+
LOG.warning("Found failure: " + throwable.toString());
266+
}
267+
}
268+
}
269+
270+
public static void writeCommittedStream(String projectId, String datasetName, String tableName)
271+
throws DescriptorValidationException, InterruptedException, IOException {
272+
try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
273+
new ParallelWriteCommittedStream().writeLoop(projectId, datasetName, tableName, client);
274+
} catch (Exception e) {
275+
System.out.println("Failed to append records. \n" + e.toString());
276+
}
277+
}
278+
}
279+
// [END bigquerystorage_jsonstreamwriter_parallelcommitted]

samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java renamed to samples/snippets/src/test/java/com/example/bigquerystorage/ParallelWriteCommittedStreamIT.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.junit.runners.JUnit4;
4242

4343
@RunWith(JUnit4.class)
44-
public class WriteToDefaultStreamIT {
44+
public class ParallelWriteCommittedStreamIT {
4545

4646
private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
4747

@@ -64,33 +64,34 @@ public static void checkRequirements() {
6464

6565
@Before
6666
public void setUp() {
67-
bout = new ByteArrayOutputStream();
68-
out = new PrintStream(bout);
69-
System.setOut(out);
70-
7167
bigquery = BigQueryOptions.getDefaultInstance().getService();
7268

7369
// Create a new dataset and table for each test.
74-
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
75-
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
70+
datasetName = "PARALLEL_WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
71+
tableName = "PARALLEL_WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
7672
Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING));
7773
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
7874
TableInfo tableInfo =
7975
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
8076
.build();
8177
bigquery.create(tableInfo);
78+
79+
bout = new ByteArrayOutputStream();
80+
out = new PrintStream(bout);
81+
System.setOut(out);
8282
}
8383

8484
@After
8585
public void tearDown() {
86+
System.setOut(null);
87+
bigquery.delete(TableId.of(GOOGLE_CLOUD_PROJECT, datasetName, tableName));
8688
bigquery.delete(
8789
DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents());
88-
System.setOut(null);
8990
}
9091

9192
@Test
92-
public void testWriteToDefaultStream() throws Exception {
93-
WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
94-
assertThat(bout.toString()).contains("Appended records successfully.");
93+
public void testParallelWriteCommittedStream() throws Exception {
94+
ParallelWriteCommittedStream.writeCommittedStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
95+
assertThat(bout.toString()).contains("All records are appended successfully.");
9596
}
9697
}

0 commit comments

Comments
 (0)