4949import org .apache .zookeeper .server .quorum .Leader .Proposal ;
5050import org .apache .zookeeper .server .quorum .QuorumPeer .LearnerType ;
5151import org .apache .zookeeper .server .quorum .auth .QuorumAuthServer ;
52+ import org .apache .zookeeper .server .util .MessageTracker ;
5253import org .apache .zookeeper .server .util .SerializeUtils ;
5354import org .apache .zookeeper .server .util .ZxidUtils ;
5455import org .apache .zookeeper .txn .TxnHeader ;
@@ -220,6 +221,8 @@ public boolean equals(Object o) {
220221 private final BufferedInputStream bufferedInput ;
221222 private BufferedOutputStream bufferedOutput ;
222223
224+ protected final MessageTracker messageTracker ;
225+
223226 // for test only
224227 protected void setOutputArchive (BinaryOutputArchive oa ) {
225228 this .oa = oa ;
@@ -280,6 +283,8 @@ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
280283 }
281284 throw new SaslException ("Authentication failure: " + e .getMessage ());
282285 }
286+
287+ this .messageTracker = new MessageTracker (MessageTracker .BUFFERED_MESSAGE_SIZE );
283288 }
284289
285290 @ Override
@@ -349,6 +354,7 @@ private void sendPackets() throws InterruptedException {
349354 }
350355 oa .writeRecord (p , "packet" );
351356 packetsSent .incrementAndGet ();
357+ messageTracker .trackSent (p .getType ());
352358 } catch (IOException e ) {
353359 if (!sock .isClosed ()) {
354360 LOG .warn ("Unexpected exception at " + this , e );
@@ -464,8 +470,11 @@ public void run() {
464470
465471 QuorumPacket qp = new QuorumPacket ();
466472 ia .readRecord (qp , "packet" );
473+
474+ messageTracker .trackReceived (qp .getType ());
467475 if (qp .getType () != Leader .FOLLOWERINFO && qp .getType () != Leader .OBSERVERINFO ) {
468476 LOG .error ("First packet " + qp .toString () + " is not FOLLOWERINFO or OBSERVERINFO!" );
477+
469478 return ;
470479 }
471480
@@ -526,9 +535,11 @@ public void run() {
526535 ByteBuffer .wrap (ver ).putInt (0x10000 );
527536 QuorumPacket newEpochPacket = new QuorumPacket (Leader .LEADERINFO , newLeaderZxid , ver , null );
528537 oa .writeRecord (newEpochPacket , "packet" );
538+ messageTracker .trackSent (Leader .LEADERINFO );
529539 bufferedOutput .flush ();
530540 QuorumPacket ackEpochPacket = new QuorumPacket ();
531541 ia .readRecord (ackEpochPacket , "packet" );
542+ messageTracker .trackReceived (ackEpochPacket .getType ());
532543 if (ackEpochPacket .getType () != Leader .ACKEPOCH ) {
533544 LOG .error (ackEpochPacket .toString () + " is not ACKEPOCH" );
534545 return ;
@@ -554,6 +565,7 @@ public void run() {
554565 try {
555566 long zxidToSend = learnerMaster .getZKDatabase ().getDataTreeLastProcessedZxid ();
556567 oa .writeRecord (new QuorumPacket (Leader .SNAP , zxidToSend , null , null ), "packet" );
568+ messageTracker .trackSent (Leader .SNAP );
557569 bufferedOutput .flush ();
558570
559571 LOG .info ("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
@@ -600,6 +612,8 @@ public void run() {
600612 */
601613 qp = new QuorumPacket ();
602614 ia .readRecord (qp , "packet" );
615+
616+ messageTracker .trackReceived (qp .getType ());
603617 if (qp .getType () != Leader .ACK ) {
604618 LOG .error ("Next packet was supposed to be an ACK," + " but received packet: {}" , packetToString (qp ));
605619 return ;
@@ -632,6 +646,7 @@ public void run() {
632646 while (true ) {
633647 qp = new QuorumPacket ();
634648 ia .readRecord (qp , "packet" );
649+ messageTracker .trackReceived (qp .getType ());
635650
636651 long traceMask = ZooTrace .SERVER_PACKET_TRACE_MASK ;
637652 if (qp .getType () == Leader .PING ) {
@@ -716,7 +731,9 @@ public void run() {
716731 syncThrottler .endSync ();
717732 syncThrottler = null ;
718733 }
719- LOG .warn ("******* GOODBYE {} ********" , getRemoteAddress ());
734+ String remoteAddr = getRemoteAddress ();
735+ LOG .warn ("******* GOODBYE {} ********" , remoteAddr );
736+ messageTracker .dumpToLog (remoteAddr );
720737 shutdown ();
721738 }
722739 }
0 commit comments