Skip to content

Commit c4e10b9

Browse files
authored
feat: add client library debug logging
* feat: add keepalive feature to tear down streams in their absence * fix: reset timer each time stream is opened * fix: update timings for server monitor to meet stream close time policy * fix: update initial delay vs period in fake scheduled executor and fix test cases * fix merging errors * feat: add client library debug logging * fix: formatting to previous version * fix: revert subscriber to original state * fix: create Logger objects for each sub-system tag * fix: ran formatter * fix: update LoggingUtil to house all subsytems * fix: typo, renaming, and using throwable on failure for logging util
1 parent 11c3e7b commit c4e10b9

File tree

4 files changed

+242
-10
lines changed

4 files changed

+242
-10
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub.v1;
18+
19+
import com.google.pubsub.v1.PubsubMessage;
20+
import java.util.logging.Level;
21+
import java.util.logging.Logger;
22+
23+
public final class LoggingUtil {
24+
// Instantiate all loggers as static final fields to maintain strong references
25+
26+
private static final Logger slowAckLogger = Logger.getLogger("slow-ack");
27+
private static final Logger callbackDeliveryLogger = Logger.getLogger("callback-delivery");
28+
private static final Logger expiryLogger = Logger.getLogger("expiry");
29+
private static final Logger callbackExceptionsLogger = Logger.getLogger("callback-exceptions");
30+
private static final Logger ackBatchLogger = Logger.getLogger("ack-batch");
31+
private static final Logger subscriberFlowControlLogger =
32+
Logger.getLogger("subscriber-flow-control");
33+
private static final Logger ackNackLogger = Logger.getLogger("ack-nack");
34+
private static final Logger publishBatchLogger = Logger.getLogger("publish-batch");
35+
private static final Logger subscriberStreamsLogger = Logger.getLogger("subscriber-streams");
36+
37+
public enum SubSystem {
38+
SLOW_ACK(slowAckLogger),
39+
CALLBACK_DELIVERY(callbackDeliveryLogger),
40+
EXPIRY(expiryLogger),
41+
CALLBACK_EXCEPTIONS(callbackExceptionsLogger),
42+
ACK_BATCH(ackBatchLogger),
43+
SUBSCRIBER_FLOW_CONTROL(subscriberFlowControlLogger),
44+
ACK_NACK(ackNackLogger),
45+
PUBLISH_BATCH(publishBatchLogger),
46+
SUBSCRIBER_STREAMS(subscriberStreamsLogger);
47+
48+
private final Logger logger;
49+
50+
SubSystem(Logger logger) {
51+
this.logger = logger;
52+
}
53+
54+
public Logger getLogger() {
55+
return logger;
56+
}
57+
}
58+
59+
public LoggingUtil() {}
60+
61+
private String getSubscriptionLogPrefix(
62+
PubsubMessageWrapper messageWrapper, String ackId, boolean exactlyOnceDeliveryEnabled) {
63+
if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) {
64+
return " Ack ID: "
65+
+ ackId
66+
+ ", Exactly Once Delivery: "
67+
+ exactlyOnceDeliveryEnabled
68+
+ " (Message details not available)";
69+
}
70+
71+
PubsubMessage message = messageWrapper.getPubsubMessage();
72+
String messageId = message.getMessageId();
73+
String orderingKey = message.getOrderingKey();
74+
75+
StringBuilder sb = new StringBuilder();
76+
sb.append("Message ID: ").append(messageId);
77+
sb.append(", Ack ID: ").append(ackId);
78+
if (orderingKey != null && !orderingKey.isEmpty()) {
79+
sb.append(", Ordering Key: ").append(orderingKey);
80+
}
81+
sb.append(", Exactly Once Delivery: ").append(exactlyOnceDeliveryEnabled);
82+
return sb.toString();
83+
}
84+
85+
private String getPublisherLogPrefix(PubsubMessageWrapper messageWrapper) {
86+
if (messageWrapper == null || messageWrapper.getPubsubMessage() == null) {
87+
return " (Message details not available)";
88+
}
89+
90+
PubsubMessage message = messageWrapper.getPubsubMessage();
91+
String messageId = message.getMessageId();
92+
String orderingKey = message.getOrderingKey();
93+
94+
StringBuilder sb = new StringBuilder();
95+
sb.append("Message ID: ").append(messageId);
96+
if (orderingKey != null && !orderingKey.isEmpty()) {
97+
sb.append(", Ordering Key: ").append(orderingKey);
98+
}
99+
return sb.toString();
100+
}
101+
102+
public void logSubscriber(
103+
SubSystem subSystem,
104+
Level level,
105+
String msg,
106+
PubsubMessageWrapper messageWrapper,
107+
String ackId,
108+
boolean exactlyOnceDeliveryEnabled) {
109+
Logger logger = subSystem.getLogger();
110+
if (logger.isLoggable(level)) {
111+
String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled);
112+
logger.log(level, prefix + " - " + msg);
113+
}
114+
}
115+
116+
public void logSubscriberWithThrowable(
117+
SubSystem subSystem,
118+
Level level,
119+
String msg,
120+
PubsubMessageWrapper messageWrapper,
121+
String ackId,
122+
boolean exactlyOnceDeliveryEnabled,
123+
Throwable throwable) {
124+
Logger logger = subSystem.getLogger();
125+
if (logger.isLoggable(level)) {
126+
String prefix = getSubscriptionLogPrefix(messageWrapper, ackId, exactlyOnceDeliveryEnabled);
127+
logger.log(level, prefix + " - " + msg, throwable);
128+
}
129+
}
130+
131+
public void logPublisher(
132+
SubSystem subSystem, Level level, String msg, PubsubMessageWrapper messageWrapper) {
133+
Logger logger = subSystem.getLogger();
134+
if (logger.isLoggable(level)) {
135+
String prefix = getPublisherLogPrefix(messageWrapper);
136+
logger.log(level, prefix + " - " + msg);
137+
}
138+
}
139+
140+
public void logEvent(SubSystem subSystem, Level level, String msg, Object... params) {
141+
Logger logger = subSystem.getLogger();
142+
if (logger.isLoggable(level)) {
143+
logger.log(level, msg, params);
144+
}
145+
}
146+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
*/
6060
class MessageDispatcher {
6161
private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
62+
private LoggingUtil loggingUtil = new LoggingUtil();
6263

6364
@InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
6465
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
@@ -112,6 +113,8 @@ class MessageDispatcher {
112113
private final SubscriberShutdownSettings subscriberShutdownSettings;
113114
private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false);
114115

116+
private final double slowAckPercentile = 99.0;
117+
115118
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
116119
public enum AckReply {
117120
ACK,
@@ -157,11 +160,13 @@ private void forget() {
157160

158161
@Override
159162
public void onFailure(Throwable t) {
160-
logger.log(
163+
loggingUtil.logSubscriberWithThrowable(
164+
LoggingUtil.SubSystem.CALLBACK_EXCEPTIONS,
161165
Level.WARNING,
162-
"MessageReceiver failed to process ack ID: "
163-
+ this.ackRequestData.getAckId()
164-
+ ", the message will be nacked.",
166+
"MessageReceiver exception.",
167+
this.ackRequestData.getMessageWrapper(),
168+
this.ackRequestData.getAckId(),
169+
exactlyOnceDeliveryEnabled.get(),
165170
t);
166171
this.ackRequestData.setResponse(AckResponse.OTHER, false);
167172
pendingNacks.add(this.ackRequestData);
@@ -171,6 +176,19 @@ public void onFailure(Throwable t) {
171176

172177
@Override
173178
public void onSuccess(AckReply reply) {
179+
int ackLatency =
180+
Ints.saturatedCast((long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D));
181+
if (ackLatency >= ackLatencyDistribution.getPercentile(slowAckPercentile)) {
182+
loggingUtil.logSubscriber(
183+
LoggingUtil.SubSystem.SLOW_ACK,
184+
Level.FINE,
185+
String.format(
186+
"Message ack duration of %d is higher than the p99 ack duration", ackLatency),
187+
this.ackRequestData.getMessageWrapper(),
188+
this.ackRequestData.getAckId(),
189+
exactlyOnceDeliveryEnabled.get());
190+
}
191+
174192
switch (reply) {
175193
case ACK:
176194
if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) {
@@ -180,15 +198,27 @@ public void onSuccess(AckReply reply) {
180198
} else {
181199
pendingAcks.add(this.ackRequestData);
182200
// Record the latency rounded to the next closest integer.
183-
ackLatencyDistribution.record(
184-
Ints.saturatedCast(
185-
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
201+
ackLatencyDistribution.record(ackLatency);
186202
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
187203
}
204+
loggingUtil.logSubscriber(
205+
LoggingUtil.SubSystem.ACK_NACK,
206+
Level.FINE,
207+
"Ack called on message.",
208+
this.ackRequestData.getMessageWrapper(),
209+
this.ackRequestData.getAckId(),
210+
exactlyOnceDeliveryEnabled.get());
188211
break;
189212
case NACK:
190213
pendingNacks.add(this.ackRequestData);
191214
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
215+
loggingUtil.logSubscriber(
216+
LoggingUtil.SubSystem.ACK_NACK,
217+
Level.FINE,
218+
"Nack called on message.",
219+
this.ackRequestData.getMessageWrapper(),
220+
this.ackRequestData.getAckId(),
221+
exactlyOnceDeliveryEnabled.get());
192222
break;
193223
default:
194224
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
@@ -568,10 +598,32 @@ private void processBatch(List<OutstandingMessage> batch) {
568598
// shutdown will block on processing of all these messages anyway.
569599
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
570600
try {
601+
loggingUtil.logSubscriber(
602+
LoggingUtil.SubSystem.SUBSCRIBER_FLOW_CONTROL,
603+
Level.FINE,
604+
"Flow controller is blocking.",
605+
message.messageWrapper(),
606+
message.messageWrapper().getAckId(),
607+
exactlyOnceDeliveryEnabled.get());
571608
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
609+
loggingUtil.logSubscriber(
610+
LoggingUtil.SubSystem.SUBSCRIBER_FLOW_CONTROL,
611+
Level.FINE,
612+
"Flow controller is done blocking.",
613+
message.messageWrapper(),
614+
message.messageWrapper().getAckId(),
615+
exactlyOnceDeliveryEnabled.get());
572616
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
573617
} catch (FlowControlException unexpectedException) {
574618
// This should be a blocking flow controller and never throw an exception.
619+
loggingUtil.logSubscriberWithThrowable(
620+
LoggingUtil.SubSystem.SUBSCRIBER_FLOW_CONTROL,
621+
Level.FINE,
622+
"Flow controller unexpected exception.",
623+
message.messageWrapper(),
624+
message.messageWrapper().getAckId(),
625+
exactlyOnceDeliveryEnabled.get(),
626+
unexpectedException);
575627
tracer.setSubscribeConcurrencyControlSpanException(
576628
message.messageWrapper(), unexpectedException);
577629
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
@@ -619,9 +671,23 @@ public void run() {
619671
// Don't nack it either, because we'd be nacking someone else's message.
620672
ackHandler.forget();
621673
tracer.setSubscriberSpanExpirationResult(messageWrapper);
674+
loggingUtil.logSubscriber(
675+
LoggingUtil.SubSystem.EXPIRY,
676+
Level.FINE,
677+
"Message expired.",
678+
messageWrapper,
679+
ackHandler.ackRequestData.getAckId(),
680+
exactlyOnceDeliveryEnabled.get());
622681
return;
623682
}
624683
tracer.startSubscribeProcessSpan(messageWrapper);
684+
loggingUtil.logSubscriber(
685+
LoggingUtil.SubSystem.CALLBACK_DELIVERY,
686+
Level.FINE,
687+
"Message delivered.",
688+
messageWrapper,
689+
ackHandler.ackRequestData.getAckId(),
690+
exactlyOnceDeliveryEnabled.get());
625691
if (shouldSetMessageFuture()) {
626692
// This is the message future that is propagated to the user
627693
SettableApiFuture<AckResponse> messageFuture =
@@ -725,7 +791,6 @@ void processOutstandingOperations() {
725791
if (!nackRequestDataList.isEmpty()) {
726792
modackRequestData.add(new ModackRequestData(0, nackRequestDataList));
727793
}
728-
logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size());
729794

730795
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
731796
pendingReceipts.drainTo(ackRequestDataReceipts);
@@ -735,13 +800,21 @@ void processOutstandingOperations() {
735800
receiptModack.setIsReceiptModack(true);
736801
modackRequestData.add(receiptModack);
737802
}
738-
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
739803

740804
ackProcessor.sendModackOperations(modackRequestData);
741805

742806
List<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();
743807
pendingAcks.drainTo(ackRequestDataList);
744-
logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size());
808+
loggingUtil.logEvent(
809+
LoggingUtil.SubSystem.ACK_BATCH,
810+
Level.FINE,
811+
"Sending {0} ACKs, {1} NACKs, {2} receipts. Exactly Once Delivery: {3}",
812+
new Object[] {
813+
ackRequestDataList.size(),
814+
nackRequestDataList.size(),
815+
ackRequestDataReceipts.size(),
816+
exactlyOnceDeliveryEnabled.get()
817+
});
745818

746819
ackProcessor.sendAckOperations(ackRequestDataList);
747820
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
*/
9595
public class Publisher implements PublisherInterface {
9696
private static final Logger logger = Logger.getLogger(Publisher.class.getName());
97+
private LoggingUtil loggingUtil = new LoggingUtil();
9798

9899
private static final String GZIP_COMPRESSION = "gzip";
99100

@@ -509,6 +510,13 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
509510
logger.log(Level.WARNING, "Attempted to publish batch with zero messages.");
510511
return;
511512
}
513+
514+
loggingUtil.logPublisher(
515+
LoggingUtil.SubSystem.PUBLISH_BATCH,
516+
Level.FINE,
517+
String.format("Attempting to batch publish %d messages", outstandingBatch.size()),
518+
outstandingBatch.getMessageWrappers().get(0));
519+
512520
final ApiFutureCallback<PublishResponse> futureCallback =
513521
new ApiFutureCallback<PublishResponse>() {
514522
@Override

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
7878
private static final Logger logger =
7979
Logger.getLogger(StreamingSubscriberConnection.class.getName());
80+
private LoggingUtil loggingUtil = new LoggingUtil();
8081

8182
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
8283
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
@@ -222,13 +223,17 @@ public boolean getExactlyOnceDeliveryEnabled() {
222223
@Override
223224
protected void doStart() {
224225
logger.config("Starting subscriber.");
226+
loggingUtil.logEvent(
227+
LoggingUtil.SubSystem.SUBSCRIBER_STREAMS, Level.FINE, "Opening stream.", "");
225228
messageDispatcher.start();
226229
initialize();
227230
notifyStarted();
228231
}
229232

230233
@Override
231234
protected void doStop() {
235+
loggingUtil.logEvent(
236+
LoggingUtil.SubSystem.SUBSCRIBER_STREAMS, Level.FINE, "Closing stream.", "");
232237
lock.lock();
233238
try {
234239
clientStream.closeSendWithError(Status.CANCELLED.asException());

0 commit comments

Comments
 (0)