Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b98d6be

Browse files
committed
Do not hold lock while sending requests, and some minor refactoring.
1 parent 3312402 commit b98d6be

1 file changed

Lines changed: 49 additions & 52 deletions

File tree

  • google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.util.concurrent.Uninterruptibles;
2424
import io.grpc.Status;
2525
import io.grpc.StatusRuntimeException;
26-
import java.time.Duration;
2726
import java.util.Deque;
2827
import java.util.LinkedList;
2928
import java.util.concurrent.TimeUnit;
@@ -53,8 +52,6 @@
5352
public class StreamWriterV2 implements AutoCloseable {
5453
private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName());
5554

56-
private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration.ofMinutes(10);
57-
5855
private Lock lock;
5956
private Condition hasMessageInWaitingQueue;
6057

@@ -104,23 +101,28 @@ private StreamWriterV2(Builder builder) {
104101
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
105102
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
106103
this.streamConnection =
107-
new StreamConnection(builder.client, new RequestCallback() {
108-
@Override
109-
public void run(AppendRowsResponse response) {
110-
requestCallback(response);
111-
}
112-
}, new DoneCallback() {
113-
@Override
114-
public void run(Throwable finalStatus) {
115-
doneCallback(finalStatus);
116-
}
117-
});
118-
this.appendThread = new Thread(new Runnable() {
119-
@Override
120-
public void run() {
121-
appendLoop();
122-
}
123-
});
104+
new StreamConnection(
105+
builder.client,
106+
new RequestCallback() {
107+
@Override
108+
public void run(AppendRowsResponse response) {
109+
requestCallback(response);
110+
}
111+
},
112+
new DoneCallback() {
113+
@Override
114+
public void run(Throwable finalStatus) {
115+
doneCallback(finalStatus);
116+
}
117+
});
118+
this.appendThread =
119+
new Thread(
120+
new Runnable() {
121+
@Override
122+
public void run() {
123+
appendLoop();
124+
}
125+
});
124126
this.appendThread.start();
125127
}
126128

@@ -210,10 +212,16 @@ private void appendLoop() {
210212
try {
211213
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
212214
while (!this.waitingRequestQueue.isEmpty()) {
213-
localQueue.addLast(this.waitingRequestQueue.pollFirst());
215+
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
216+
this.inflightRequestQueue.addLast(requestWrapper);
217+
localQueue.addLast(requestWrapper);
214218
}
215219
} catch (InterruptedException e) {
216-
log.warning("Interrupted while waiting for message. Error: " + e.toString());
220+
log.warning(
221+
"Interrupted while waiting for message. Stream: "
222+
+ streamName
223+
+ " Error: "
224+
+ e.toString());
217225
} finally {
218226
this.lock.unlock();
219227
}
@@ -223,43 +231,16 @@ private void appendLoop() {
223231
}
224232

225233
// TODO: Add reconnection here.
226-
227-
this.lock.lock();
228-
try {
229-
while (!localQueue.isEmpty()) {
230-
AppendRequestAndResponse requestWrapper = localQueue.pollFirst();
231-
this.inflightRequestQueue.addLast(requestWrapper);
232-
this.streamConnection.send(requestWrapper.message);
233-
}
234-
} finally {
235-
this.lock.unlock();
234+
while (!localQueue.isEmpty()) {
235+
this.streamConnection.send(localQueue.pollFirst().message);
236236
}
237237
}
238238

239239
log.info("Cleanup starts. Stream: " + streamName);
240240
// At this point, the waiting queue is drained, so no more requests.
241241
// We can close the stream connection and handle the remaining inflight requests.
242242
this.streamConnection.close();
243-
244-
log.info("Waiting for done callback from stream connection. Stream: " + streamName);
245-
long waitDeadlineMs = System.currentTimeMillis() + DONE_CALLBACK_WAIT_TIMEOUT.toMillis();
246-
while (true) {
247-
if (System.currentTimeMillis() > waitDeadlineMs) {
248-
log.warning(
249-
"Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName);
250-
return;
251-
}
252-
this.lock.lock();
253-
try {
254-
if (connectionFinalStatus != null) {
255-
// Done callback is received, break.
256-
break;
257-
}
258-
} finally {
259-
this.lock.unlock();
260-
}
261-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
262-
}
243+
waitForDoneCallback();
263244

264245
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
265246
log.info(
@@ -284,6 +265,22 @@ private boolean waitingQueueDrained() {
284265
}
285266
}
286267

268+
private void waitForDoneCallback() {
269+
log.info("Waiting for done callback from stream connection. Stream: " + streamName);
270+
while (true) {
271+
this.lock.lock();
272+
try {
273+
if (connectionFinalStatus != null) {
274+
// Done callback is received, return.
275+
return;
276+
}
277+
} finally {
278+
this.lock.unlock();
279+
}
280+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
281+
}
282+
}
283+
287284
private void cleanupInflightRequests() {
288285
Throwable finalStatus;
289286
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();

0 commit comments

Comments
 (0)