Skip to content

Commit 812bcf1

Browse files
fix: improve ConnectionWorker fine logging (#1972)
* fix: improving ConnectionWorker fine logging * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent b05fff5 commit 812bcf1

2 files changed

Lines changed: 46 additions & 23 deletions

File tree

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
4949
If you are using Gradle 5.x or later, add this to your dependencies:
5050

5151
```Groovy
52-
implementation platform('com.google.cloud:libraries-bom:26.5.0')
52+
implementation platform('com.google.cloud:libraries-bom:26.6.0')
5353
5454
implementation 'com.google.cloud:google-cloud-bigquerystorage'
5555
```
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.29.0'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.29.0"
6666
```
6767

6868
## Authentication

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public void run() {
242242
}
243243

244244
private void resetConnection() {
245-
log.info("Reconnecting for stream:" + streamName);
245+
log.info("Reconnecting for stream:" + streamName + " id: " + writerId);
246246
this.streamConnection =
247247
new StreamConnection(
248248
this.client,
@@ -258,6 +258,7 @@ public void run(Throwable finalStatus) {
258258
doneCallback(finalStatus);
259259
}
260260
});
261+
log.info("Reconnect done for stream:" + streamName + " id: " + writerId);
261262
}
262263

263264
/** Schedules the writing of rows at given offset. */
@@ -392,13 +393,18 @@ public void close() {
392393
} finally {
393394
this.lock.unlock();
394395
}
395-
log.fine("Waiting for append thread to finish. Stream: " + streamName);
396+
log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
396397
try {
397398
appendThread.join();
398399
} catch (InterruptedException e) {
399400
// Unexpected. Just swallow the exception with logging.
400401
log.warning(
401-
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
402+
"Append handler join is interrupted. Stream: "
403+
+ streamName
404+
+ " id: "
405+
+ writerId
406+
+ " Error: "
407+
+ e.toString());
402408
}
403409
this.client.close();
404410
try {
@@ -408,14 +414,20 @@ public void close() {
408414
}
409415

410416
try {
411-
log.fine("Begin shutting down user callback thread pool for stream " + streamName);
417+
log.fine(
418+
"Begin shutting down user callback thread pool for stream "
419+
+ streamName
420+
+ " id: "
421+
+ writerId);
412422
threadPool.shutdown();
413423
threadPool.awaitTermination(3, TimeUnit.MINUTES);
414424
} catch (InterruptedException e) {
415425
// Unexpected. Just swallow the exception with logging.
416426
log.warning(
417427
"Close on thread pool for "
418428
+ streamName
429+
+ " id: "
430+
+ writerId
419431
+ " is interrupted with exception: "
420432
+ e.toString());
421433
throw new IllegalStateException(
@@ -464,6 +476,8 @@ private void appendLoop() {
464476
log.warning(
465477
"Interrupted while waiting for message. Stream: "
466478
+ streamName
479+
+ " id: "
480+
+ writerId
467481
+ " Error: "
468482
+ e.toString());
469483
} finally {
@@ -539,17 +553,11 @@ private void appendLoop() {
539553
// TODO: Handle NOT_ENOUGH_QUOTA.
540554
// In the close case, the request is in the inflight queue, and will either be returned
541555
// to the user with an error, or will be resent.
542-
log.fine(
543-
"Sending "
544-
+ originalRequestBuilder.getProtoRows().getRows().getSerializedRowsCount()
545-
+ " rows to stream '"
546-
+ originalRequestBuilder.getWriteStream()
547-
+ "'");
548556
this.streamConnection.send(originalRequestBuilder.build());
549557
}
550558
}
551559

552-
log.fine("Cleanup starts. Stream: " + streamName);
560+
log.fine("Cleanup starts. Stream: " + streamName + " id: " + writerId);
553561
// At this point, the waiting queue is drained, so no more requests.
554562
// We can close the stream connection and handle the remaining inflight requests.
555563
if (streamConnection != null) {
@@ -559,9 +567,12 @@ private void appendLoop() {
559567

560568
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
561569
log.fine(
562-
"Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName);
570+
"Stream connection is fully closed. Cleaning up inflight requests. Stream: "
571+
+ streamName
572+
+ " id: "
573+
+ writerId);
563574
cleanupInflightRequests();
564-
log.fine("Append thread is done. Stream: " + streamName);
575+
log.fine("Append thread is done. Stream: " + streamName + " id: " + writerId);
565576
}
566577

567578
/*
@@ -581,7 +592,11 @@ private boolean waitingQueueDrained() {
581592
}
582593

583594
private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
584-
log.fine("Waiting for done callback from stream connection. Stream: " + streamName);
595+
log.fine(
596+
"Waiting for done callback from stream connection. Stream: "
597+
+ streamName
598+
+ " id: "
599+
+ writerId);
585600
long deadline = System.nanoTime() + timeUnit.toNanos(duration);
586601
while (System.nanoTime() <= deadline) {
587602
this.lock.lock();
@@ -630,23 +645,29 @@ private void cleanupInflightRequests() {
630645
} finally {
631646
this.lock.unlock();
632647
}
633-
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
648+
log.fine(
649+
"Cleaning "
650+
+ localQueue.size()
651+
+ " inflight requests with error: "
652+
+ finalStatus
653+
+ " for Stream "
654+
+ streamName
655+
+ " id: "
656+
+ writerId);
634657
while (!localQueue.isEmpty()) {
635658
localQueue.pollFirst().appendResult.setException(finalStatus);
636659
}
637660
}
638661

639662
private void requestCallback(AppendRowsResponse response) {
640-
if (!response.hasUpdatedSchema()) {
641-
log.fine(String.format("Got response on stream %s", response.toString()));
642-
} else {
663+
if (response.hasUpdatedSchema()) {
643664
AppendRowsResponse responseWithUpdatedSchemaRemoved =
644665
response.toBuilder().clearUpdatedSchema().build();
645666

646667
log.fine(
647668
String.format(
648-
"Got response with schema updated (omitting updated schema in response here): %s",
649-
responseWithUpdatedSchemaRemoved.toString()));
669+
"Got response with schema updated (omitting updated schema in response here): %s writer id %s",
670+
responseWithUpdatedSchemaRemoved.toString(), writerId));
650671
}
651672

652673
AppendRequestAndResponse requestWrapper;
@@ -737,6 +758,8 @@ private void doneCallback(Throwable finalStatus) {
737758
log.fine(
738759
"Received done callback. Stream: "
739760
+ streamName
761+
+ " worker id: "
762+
+ writerId
740763
+ " Final status: "
741764
+ finalStatus.toString());
742765
this.lock.lock();

0 commit comments

Comments
 (0)