@@ -242,7 +242,7 @@ public void run() {
242242 }
243243
244244 private void resetConnection () {
245- log .info ("Reconnecting for stream:" + streamName );
245+ log .info ("Reconnecting for stream:" + streamName + " id: " + writerId );
246246 this .streamConnection =
247247 new StreamConnection (
248248 this .client ,
@@ -258,6 +258,7 @@ public void run(Throwable finalStatus) {
258258 doneCallback (finalStatus );
259259 }
260260 });
261+ log .info ("Reconnect done for stream:" + streamName + " id: " + writerId );
261262 }
262263
263264 /** Schedules the writing of rows at given offset. */
@@ -392,13 +393,18 @@ public void close() {
392393 } finally {
393394 this .lock .unlock ();
394395 }
395- log .fine ("Waiting for append thread to finish. Stream: " + streamName );
396+ log .fine ("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId );
396397 try {
397398 appendThread .join ();
398399 } catch (InterruptedException e ) {
399400 // Unexpected. Just swallow the exception with logging.
400401 log .warning (
401- "Append handler join is interrupted. Stream: " + streamName + " Error: " + e .toString ());
402+ "Append handler join is interrupted. Stream: "
403+ + streamName
404+ + " id: "
405+ + writerId
406+ + " Error: "
407+ + e .toString ());
402408 }
403409 this .client .close ();
404410 try {
@@ -408,14 +414,20 @@ public void close() {
408414 }
409415
410416 try {
411- log .fine ("Begin shutting down user callback thread pool for stream " + streamName );
417+ log .fine (
418+ "Begin shutting down user callback thread pool for stream "
419+ + streamName
420+ + " id: "
421+ + writerId );
412422 threadPool .shutdown ();
413423 threadPool .awaitTermination (3 , TimeUnit .MINUTES );
414424 } catch (InterruptedException e ) {
415425 // Unexpected. Just swallow the exception with logging.
416426 log .warning (
417427 "Close on thread pool for "
418428 + streamName
429+ + " id: "
430+ + writerId
419431 + " is interrupted with exception: "
420432 + e .toString ());
421433 throw new IllegalStateException (
@@ -464,6 +476,8 @@ private void appendLoop() {
464476 log .warning (
465477 "Interrupted while waiting for message. Stream: "
466478 + streamName
479+ + " id: "
480+ + writerId
467481 + " Error: "
468482 + e .toString ());
469483 } finally {
@@ -539,17 +553,11 @@ private void appendLoop() {
539553 // TODO: Handle NOT_ENOUGH_QUOTA.
540554 // In the close case, the request is in the inflight queue, and will either be returned
541555 // to the user with an error, or will be resent.
542- log .fine (
543- "Sending "
544- + originalRequestBuilder .getProtoRows ().getRows ().getSerializedRowsCount ()
545- + " rows to stream '"
546- + originalRequestBuilder .getWriteStream ()
547- + "'" );
548556 this .streamConnection .send (originalRequestBuilder .build ());
549557 }
550558 }
551559
552- log .fine ("Cleanup starts. Stream: " + streamName );
560+ log .fine ("Cleanup starts. Stream: " + streamName + " id: " + writerId );
553561 // At this point, the waiting queue is drained, so no more requests.
554562 // We can close the stream connection and handle the remaining inflight requests.
555563 if (streamConnection != null ) {
@@ -559,9 +567,12 @@ private void appendLoop() {
559567
560568 // At this point, there cannot be more callback. It is safe to clean up all inflight requests.
561569 log .fine (
562- "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName );
570+ "Stream connection is fully closed. Cleaning up inflight requests. Stream: "
571+ + streamName
572+ + " id: "
573+ + writerId );
563574 cleanupInflightRequests ();
564- log .fine ("Append thread is done. Stream: " + streamName );
575+ log .fine ("Append thread is done. Stream: " + streamName + " id: " + writerId );
565576 }
566577
567578 /*
@@ -581,7 +592,11 @@ private boolean waitingQueueDrained() {
581592 }
582593
583594 private void waitForDoneCallback (long duration , TimeUnit timeUnit ) {
584- log .fine ("Waiting for done callback from stream connection. Stream: " + streamName );
595+ log .fine (
596+ "Waiting for done callback from stream connection. Stream: "
597+ + streamName
598+ + " id: "
599+ + writerId );
585600 long deadline = System .nanoTime () + timeUnit .toNanos (duration );
586601 while (System .nanoTime () <= deadline ) {
587602 this .lock .lock ();
@@ -630,23 +645,29 @@ private void cleanupInflightRequests() {
630645 } finally {
631646 this .lock .unlock ();
632647 }
633- log .fine ("Cleaning " + localQueue .size () + " inflight requests with error: " + finalStatus );
648+ log .fine (
649+ "Cleaning "
650+ + localQueue .size ()
651+ + " inflight requests with error: "
652+ + finalStatus
653+ + " for Stream "
654+ + streamName
655+ + " id: "
656+ + writerId );
634657 while (!localQueue .isEmpty ()) {
635658 localQueue .pollFirst ().appendResult .setException (finalStatus );
636659 }
637660 }
638661
639662 private void requestCallback (AppendRowsResponse response ) {
640- if (!response .hasUpdatedSchema ()) {
641- log .fine (String .format ("Got response on stream %s" , response .toString ()));
642- } else {
663+ if (response .hasUpdatedSchema ()) {
643664 AppendRowsResponse responseWithUpdatedSchemaRemoved =
644665 response .toBuilder ().clearUpdatedSchema ().build ();
645666
646667 log .fine (
647668 String .format (
648- "Got response with schema updated (omitting updated schema in response here): %s" ,
649- responseWithUpdatedSchemaRemoved .toString ()));
669+ "Got response with schema updated (omitting updated schema in response here): %s writer id %s " ,
670+ responseWithUpdatedSchemaRemoved .toString (), writerId ));
650671 }
651672
652673 AppendRequestAndResponse requestWrapper ;
@@ -737,6 +758,8 @@ private void doneCallback(Throwable finalStatus) {
737758 log .fine (
738759 "Received done callback. Stream: "
739760 + streamName
761+ + " worker id: "
762+ + writerId
740763 + " Final status: "
741764 + finalStatus .toString ());
742765 this .lock .lock ();
0 commit comments