2323import com .google .common .util .concurrent .Uninterruptibles ;
2424import io .grpc .Status ;
2525import io .grpc .StatusRuntimeException ;
26- import java .time .Duration ;
2726import java .util .Deque ;
2827import java .util .LinkedList ;
2928import java .util .concurrent .TimeUnit ;
5352public class StreamWriterV2 implements AutoCloseable {
5453 private static final Logger log = Logger .getLogger (StreamWriterV2 .class .getName ());
5554
56- private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration .ofMinutes (10 );
57-
5855 private Lock lock ;
5956 private Condition hasMessageInWaitingQueue ;
6057
@@ -104,23 +101,28 @@ private StreamWriterV2(Builder builder) {
104101 this .waitingRequestQueue = new LinkedList <AppendRequestAndResponse >();
105102 this .inflightRequestQueue = new LinkedList <AppendRequestAndResponse >();
106103 this .streamConnection =
107- new StreamConnection (builder .client , new RequestCallback () {
108- @ Override
109- public void run (AppendRowsResponse response ) {
110- requestCallback (response );
111- }
112- }, new DoneCallback () {
113- @ Override
114- public void run (Throwable finalStatus ) {
115- doneCallback (finalStatus );
116- }
117- });
118- this .appendThread = new Thread (new Runnable () {
119- @ Override
120- public void run () {
121- appendLoop ();
122- }
123- });
104+ new StreamConnection (
105+ builder .client ,
106+ new RequestCallback () {
107+ @ Override
108+ public void run (AppendRowsResponse response ) {
109+ requestCallback (response );
110+ }
111+ },
112+ new DoneCallback () {
113+ @ Override
114+ public void run (Throwable finalStatus ) {
115+ doneCallback (finalStatus );
116+ }
117+ });
118+ this .appendThread =
119+ new Thread (
120+ new Runnable () {
121+ @ Override
122+ public void run () {
123+ appendLoop ();
124+ }
125+ });
124126 this .appendThread .start ();
125127 }
126128
@@ -210,10 +212,16 @@ private void appendLoop() {
210212 try {
211213 hasMessageInWaitingQueue .await (100 , TimeUnit .MILLISECONDS );
212214 while (!this .waitingRequestQueue .isEmpty ()) {
213- localQueue .addLast (this .waitingRequestQueue .pollFirst ());
215+ AppendRequestAndResponse requestWrapper = this .waitingRequestQueue .pollFirst ();
216+ this .inflightRequestQueue .addLast (requestWrapper );
217+ localQueue .addLast (requestWrapper );
214218 }
215219 } catch (InterruptedException e ) {
216- log .warning ("Interrupted while waiting for message. Error: " + e .toString ());
220+ log .warning (
221+ "Interrupted while waiting for message. Stream: "
222+ + streamName
223+ + " Error: "
224+ + e .toString ());
217225 } finally {
218226 this .lock .unlock ();
219227 }
@@ -223,43 +231,16 @@ private void appendLoop() {
223231 }
224232
225233 // TODO: Add reconnection here.
226-
227- this .lock .lock ();
228- try {
229- while (!localQueue .isEmpty ()) {
230- AppendRequestAndResponse requestWrapper = localQueue .pollFirst ();
231- this .inflightRequestQueue .addLast (requestWrapper );
232- this .streamConnection .send (requestWrapper .message );
233- }
234- } finally {
235- this .lock .unlock ();
234+ while (!localQueue .isEmpty ()) {
235+ this .streamConnection .send (localQueue .pollFirst ().message );
236236 }
237237 }
238238
239239 log .info ("Cleanup starts. Stream: " + streamName );
240240 // At this point, the waiting queue is drained, so no more requests.
241241 // We can close the stream connection and handle the remaining inflight requests.
242242 this .streamConnection .close ();
243-
244- log .info ("Waiting for done callback from stream connection. Stream: " + streamName );
245- long waitDeadlineMs = System .currentTimeMillis () + DONE_CALLBACK_WAIT_TIMEOUT .toMillis ();
246- while (true ) {
247- if (System .currentTimeMillis () > waitDeadlineMs ) {
248- log .warning (
249- "Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName );
250- return ;
251- }
252- this .lock .lock ();
253- try {
254- if (connectionFinalStatus != null ) {
255- // Done callback is received, break.
256- break ;
257- }
258- } finally {
259- this .lock .unlock ();
260- }
261- Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
262- }
243+ waitForDoneCallback ();
263244
264245 // At this point, there cannot be more callback. It is safe to clean up all inflight requests.
265246 log .info (
@@ -284,6 +265,22 @@ private boolean waitingQueueDrained() {
284265 }
285266 }
286267
268+ private void waitForDoneCallback () {
269+ log .info ("Waiting for done callback from stream connection. Stream: " + streamName );
270+ while (true ) {
271+ this .lock .lock ();
272+ try {
273+ if (connectionFinalStatus != null ) {
274+ // Done callback is received, return.
275+ return ;
276+ }
277+ } finally {
278+ this .lock .unlock ();
279+ }
280+ Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
281+ }
282+ }
283+
287284 private void cleanupInflightRequests () {
288285 Throwable finalStatus ;
289286 Deque <AppendRequestAndResponse > localQueue = new LinkedList <AppendRequestAndResponse >();
0 commit comments