|
17 | 17 | package com.example.bigquerystorage; |
18 | 18 |
|
19 | 19 | import com.google.api.core.ApiFuture; |
| 20 | +import com.google.api.core.ApiFutureCallback; |
| 21 | +import com.google.api.core.ApiFutures; |
20 | 22 | import com.google.cloud.bigquery.BigQuery; |
21 | 23 | import com.google.cloud.bigquery.BigQueryOptions; |
22 | 24 | import com.google.cloud.bigquery.Field; |
|
31 | 33 | import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; |
32 | 34 | import com.google.cloud.bigquery.storage.v1.TableName; |
33 | 35 | import com.google.cloud.bigquery.storage.v1.TableSchema; |
| 36 | +import com.google.common.util.concurrent.MoreExecutors; |
34 | 37 | import com.google.protobuf.Descriptors.DescriptorValidationException; |
35 | 38 | import java.io.BufferedReader; |
36 | 39 | import java.io.FileReader; |
37 | 40 | import java.io.IOException; |
38 | | -import java.util.concurrent.ExecutionException; |
39 | 41 | import org.json.JSONArray; |
40 | 42 | import org.json.JSONObject; |
41 | 43 |
|
@@ -118,20 +120,32 @@ public static void writeToDefaultStream( |
118 | 120 | } |
119 | 121 | } // batch |
120 | 122 | ApiFuture<AppendRowsResponse> future = writer.append(jsonArr); |
121 | | - AppendRowsResponse response = future.get(); |
122 | | - if (response.hasUpdatedSchema()) { |
123 | | - // The destination table schema has changed. The client library automatically |
124 | | - // reestablishes a connection to the backend using the new schema, so we can continue |
125 | | - // to send data without interruption. |
126 | | - System.out.println("Table schema changed."); |
127 | | - } |
| 123 | + // The append method is asynchronous. Rather than waiting for the method to complete, |
| 124 | + // which can hurt performance, register a completion callback and continue streaming. |
| 125 | + ApiFutures.addCallback( |
| 126 | + future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); |
128 | 127 | } |
129 | | - System.out.println("Appended records successfully."); |
130 | | - } catch (ExecutionException e) { |
131 | | - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. |
132 | | - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: |
133 | | - // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html |
134 | | - System.out.println("Failed to append records. \n" + e.toString()); |
135 | 128 | } |
136 | 129 | } |
137 | 130 | } |
| 131 | + |
| 132 | +class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> { |
| 133 | + |
| 134 | + private static int batchCount = 0; |
| 135 | + private static final Object lock = new Object(); |
| 136 | + |
| 137 | + public void onSuccess(AppendRowsResponse response) { |
| 138 | + synchronized (lock) { |
| 139 | + if (response.hasError()) { |
| 140 | + System.out.format("Error: %s\n", response.getError().toString()); |
| 141 | + } else { |
| 142 | + ++batchCount; |
| 143 | + System.out.format("Wrote batch %d\n", batchCount); |
| 144 | + } |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + public void onFailure(Throwable throwable) { |
| 149 | + System.out.format("Error: %s\n", throwable.toString()); |
| 150 | + } |
| 151 | +} |
0 commit comments