5959 */
6060class MessageDispatcher {
6161 private static final Logger logger = Logger .getLogger (MessageDispatcher .class .getName ());
62+ private LoggingUtil loggingUtil = new LoggingUtil ();
6263
6364 @ InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9 ;
6465 @ InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration .ofMillis (100 );
@@ -112,6 +113,8 @@ class MessageDispatcher {
112113 private final SubscriberShutdownSettings subscriberShutdownSettings ;
113114 private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean (false );
114115
116+ private final double slowAckPercentile = 99.0 ;
117+
115118 /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
116119 public enum AckReply {
117120 ACK ,
@@ -157,11 +160,13 @@ private void forget() {
157160
158161 @ Override
159162 public void onFailure (Throwable t ) {
160- logger .log (
163+ loggingUtil .logSubscriberWithThrowable (
164+ LoggingUtil .SubSystem .CALLBACK_EXCEPTIONS ,
161165 Level .WARNING ,
162- "MessageReceiver failed to process ack ID: "
163- + this .ackRequestData .getAckId ()
164- + ", the message will be nacked." ,
166+ "MessageReceiver exception." ,
167+ this .ackRequestData .getMessageWrapper (),
168+ this .ackRequestData .getAckId (),
169+ exactlyOnceDeliveryEnabled .get (),
165170 t );
166171 this .ackRequestData .setResponse (AckResponse .OTHER , false );
167172 pendingNacks .add (this .ackRequestData );
@@ -171,6 +176,19 @@ public void onFailure(Throwable t) {
171176
172177 @ Override
173178 public void onSuccess (AckReply reply ) {
179+ int ackLatency =
180+ Ints .saturatedCast ((long ) Math .ceil ((clock .millisTime () - receivedTimeMillis ) / 1000D ));
181+ if (ackLatency >= ackLatencyDistribution .getPercentile (slowAckPercentile )) {
182+ loggingUtil .logSubscriber (
183+ LoggingUtil .SubSystem .SLOW_ACK ,
184+ Level .FINE ,
185+ String .format (
186+ "Message ack duration of %d is higher than the p99 ack duration" , ackLatency ),
187+ this .ackRequestData .getMessageWrapper (),
188+ this .ackRequestData .getAckId (),
189+ exactlyOnceDeliveryEnabled .get ());
190+ }
191+
174192 switch (reply ) {
175193 case ACK :
176194 if (nackImmediatelyShutdownInProgress .get () && exactlyOnceDeliveryEnabled .get ()) {
@@ -180,15 +198,27 @@ public void onSuccess(AckReply reply) {
180198 } else {
181199 pendingAcks .add (this .ackRequestData );
182200 // Record the latency rounded to the next closest integer.
183- ackLatencyDistribution .record (
184- Ints .saturatedCast (
185- (long ) Math .ceil ((clock .millisTime () - receivedTimeMillis ) / 1000D )));
201+ ackLatencyDistribution .record (ackLatency );
186202 tracer .endSubscribeProcessSpan (this .ackRequestData .getMessageWrapper (), "ack" );
187203 }
204+ loggingUtil .logSubscriber (
205+ LoggingUtil .SubSystem .ACK_NACK ,
206+ Level .FINE ,
207+ "Ack called on message." ,
208+ this .ackRequestData .getMessageWrapper (),
209+ this .ackRequestData .getAckId (),
210+ exactlyOnceDeliveryEnabled .get ());
188211 break ;
189212 case NACK :
190213 pendingNacks .add (this .ackRequestData );
191214 tracer .endSubscribeProcessSpan (this .ackRequestData .getMessageWrapper (), "nack" );
215+ loggingUtil .logSubscriber (
216+ LoggingUtil .SubSystem .ACK_NACK ,
217+ Level .FINE ,
218+ "Nack called on message." ,
219+ this .ackRequestData .getMessageWrapper (),
220+ this .ackRequestData .getAckId (),
221+ exactlyOnceDeliveryEnabled .get ());
192222 break ;
193223 default :
194224 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
@@ -568,10 +598,32 @@ private void processBatch(List<OutstandingMessage> batch) {
568598 // shutdown will block on processing of all these messages anyway.
569599 tracer .startSubscribeConcurrencyControlSpan (message .messageWrapper ());
570600 try {
601+ loggingUtil .logSubscriber (
602+ LoggingUtil .SubSystem .SUBSCRIBER_FLOW_CONTROL ,
603+ Level .FINE ,
604+ "Flow controller is blocking." ,
605+ message .messageWrapper (),
606+ message .messageWrapper ().getAckId (),
607+ exactlyOnceDeliveryEnabled .get ());
571608 flowController .reserve (1 , message .messageWrapper ().getPubsubMessage ().getSerializedSize ());
609+ loggingUtil .logSubscriber (
610+ LoggingUtil .SubSystem .SUBSCRIBER_FLOW_CONTROL ,
611+ Level .FINE ,
612+ "Flow controller is done blocking." ,
613+ message .messageWrapper (),
614+ message .messageWrapper ().getAckId (),
615+ exactlyOnceDeliveryEnabled .get ());
572616 tracer .endSubscribeConcurrencyControlSpan (message .messageWrapper ());
573617 } catch (FlowControlException unexpectedException ) {
574618 // This should be a blocking flow controller and never throw an exception.
619+ loggingUtil .logSubscriberWithThrowable (
620+ LoggingUtil .SubSystem .SUBSCRIBER_FLOW_CONTROL ,
621+ Level .FINE ,
622+ "Flow controller unexpected exception." ,
623+ message .messageWrapper (),
624+ message .messageWrapper ().getAckId (),
625+ exactlyOnceDeliveryEnabled .get (),
626+ unexpectedException );
575627 tracer .setSubscribeConcurrencyControlSpanException (
576628 message .messageWrapper (), unexpectedException );
577629 throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
@@ -619,9 +671,23 @@ public void run() {
619671 // Don't nack it either, because we'd be nacking someone else's message.
620672 ackHandler .forget ();
621673 tracer .setSubscriberSpanExpirationResult (messageWrapper );
674+ loggingUtil .logSubscriber (
675+ LoggingUtil .SubSystem .EXPIRY ,
676+ Level .FINE ,
677+ "Message expired." ,
678+ messageWrapper ,
679+ ackHandler .ackRequestData .getAckId (),
680+ exactlyOnceDeliveryEnabled .get ());
622681 return ;
623682 }
624683 tracer .startSubscribeProcessSpan (messageWrapper );
684+ loggingUtil .logSubscriber (
685+ LoggingUtil .SubSystem .CALLBACK_DELIVERY ,
686+ Level .FINE ,
687+ "Message delivered." ,
688+ messageWrapper ,
689+ ackHandler .ackRequestData .getAckId (),
690+ exactlyOnceDeliveryEnabled .get ());
625691 if (shouldSetMessageFuture ()) {
626692 // This is the message future that is propagated to the user
627693 SettableApiFuture <AckResponse > messageFuture =
@@ -725,7 +791,6 @@ void processOutstandingOperations() {
725791 if (!nackRequestDataList .isEmpty ()) {
726792 modackRequestData .add (new ModackRequestData (0 , nackRequestDataList ));
727793 }
728- logger .log (Level .FINER , "Sending {0} nacks" , nackRequestDataList .size ());
729794
730795 List <AckRequestData > ackRequestDataReceipts = new ArrayList <AckRequestData >();
731796 pendingReceipts .drainTo (ackRequestDataReceipts );
@@ -735,13 +800,21 @@ void processOutstandingOperations() {
735800 receiptModack .setIsReceiptModack (true );
736801 modackRequestData .add (receiptModack );
737802 }
738- logger .log (Level .FINER , "Sending {0} receipts" , ackRequestDataReceipts .size ());
739803
740804 ackProcessor .sendModackOperations (modackRequestData );
741805
742806 List <AckRequestData > ackRequestDataList = new ArrayList <AckRequestData >();
743807 pendingAcks .drainTo (ackRequestDataList );
744- logger .log (Level .FINER , "Sending {0} acks" , ackRequestDataList .size ());
808+ loggingUtil .logEvent (
809+ LoggingUtil .SubSystem .ACK_BATCH ,
810+ Level .FINE ,
811+ "Sending {0} ACKs, {1} NACKs, {2} receipts. Exactly Once Delivery: {3}" ,
812+ new Object [] {
813+ ackRequestDataList .size (),
814+ nackRequestDataList .size (),
815+ ackRequestDataReceipts .size (),
816+ exactlyOnceDeliveryEnabled .get ()
817+ });
745818
746819 ackProcessor .sendAckOperations (ackRequestDataList );
747820 }
0 commit comments