Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit c777e23

Browse files
docs(tutorials): Call append asynchronously instead of blocking (#1542)
1 parent 433eb70 commit c777e23

2 files changed

Lines changed: 29 additions & 15 deletions

File tree

tutorials/JsonWriterDefaultStream/src/main/java/com/example/JsonWriterDefaultStream.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.example.bigquerystorage;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
2022
import com.google.cloud.bigquery.BigQuery;
2123
import com.google.cloud.bigquery.BigQueryOptions;
2224
import com.google.cloud.bigquery.Field;
@@ -31,11 +33,11 @@
3133
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
3234
import com.google.cloud.bigquery.storage.v1.TableName;
3335
import com.google.cloud.bigquery.storage.v1.TableSchema;
36+
import com.google.common.util.concurrent.MoreExecutors;
3437
import com.google.protobuf.Descriptors.DescriptorValidationException;
3538
import java.io.BufferedReader;
3639
import java.io.FileReader;
3740
import java.io.IOException;
38-
import java.util.concurrent.ExecutionException;
3941
import org.json.JSONArray;
4042
import org.json.JSONObject;
4143

@@ -118,20 +120,32 @@ public static void writeToDefaultStream(
118120
}
119121
} // batch
120122
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
121-
AppendRowsResponse response = future.get();
122-
if (response.hasUpdatedSchema()) {
123-
// The destination table schema has changed. The client library automatically
124-
// reestablishes a connection to the backend using the new schema, so we can continue
125-
// to send data without interruption.
126-
System.out.println("Table schema changed.");
127-
}
123+
// The append method is asynchronous. Rather than waiting for the method to complete,
124+
// which can hurt performance, register a completion callback and continue streaming.
125+
ApiFutures.addCallback(
126+
future, new AppendCompleteCallback(), MoreExecutors.directExecutor());
128127
}
129-
System.out.println("Appended records successfully.");
130-
} catch (ExecutionException e) {
131-
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
132-
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
133-
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
134-
System.out.println("Failed to append records. \n" + e.toString());
135128
}
136129
}
137130
}
131+
132+
class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
133+
134+
private static int batchCount = 0;
135+
private static final Object lock = new Object();
136+
137+
public void onSuccess(AppendRowsResponse response) {
138+
synchronized (lock) {
139+
if (response.hasError()) {
140+
System.out.format("Error: %s\n", response.getError().toString());
141+
} else {
142+
++batchCount;
143+
System.out.format("Wrote batch %d\n", batchCount);
144+
}
145+
}
146+
}
147+
148+
public void onFailure(Throwable throwable) {
149+
System.out.format("Error: %s\n", throwable.toString());
150+
}
151+
}

tutorials/JsonWriterDefaultStream/src/test/java/com/example/JsonWriterDefaultStreamIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testJsonWriterDefaultStream() throws Exception {
6868
System.out.println(dataFilePath.toString());
6969
String[] args = {GOOGLE_CLOUD_PROJECT, datasetName, "github", dataFilePath.toString()};
7070
JsonWriterDefaultStream.main(args);
71-
assertThat(bout.toString()).contains("Appended records successfully.");
71+
assertThat(bout.toString()).contains("Wrote batch");
7272
}
7373

7474
@After

0 commit comments

Comments
 (0)