@@ -47,7 +47,7 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
4747 protected final AtomicBoolean hb ;
4848 protected final AtomicLong idleHeartbeatSettingMillis ;
4949 protected final AtomicLong alarmPeriodSettingNanos ;
50- protected final AtomicReference <ScheduledTask > heartbeatTask ;
50+ protected final AtomicReference <ScheduledTask > heartbeatTaskRef ;
5151
5252 protected MessageManager (NatsConnection conn , SubscribeOptions so , boolean syncMode ) {
5353 stateChangeLock = new ReentrantLock ();
@@ -62,7 +62,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
6262 idleHeartbeatSettingMillis = new AtomicLong ();
6363 alarmPeriodSettingNanos = new AtomicLong ();
6464 lastMsgReceivedNanoTime = new AtomicLong (NatsSystemClock .nanoTime ());
65- heartbeatTask = new AtomicReference <>();
65+ heartbeatTaskRef = new AtomicReference <>();
6666 }
6767
6868 protected boolean isSyncMode () { return syncMode ; }
@@ -139,33 +139,26 @@ protected void updateLastMessageReceived() {
139139 protected void initOrResetHeartbeatTimer () {
140140 stateChangeLock .lock ();
141141 try {
142- ScheduledTask hbTask = heartbeatTask .get ();
142+ ScheduledTask hbTask = heartbeatTaskRef .get ();
143143 if (hbTask != null ) {
144- // Same settings, just reuse the existing scheduled task
145- if (hbTask .getPeriodNanos () == alarmPeriodSettingNanos .get ()) {
146- updateLastMessageReceived ();
147- return ;
148- }
149-
150- // Replace timer since settings have changed
144+ // we always want a fresh schedule because it will have the initial delay
151145 hbTask .shutdown ();
152146 }
153147
154- // so the alarm doesn't trigger too soon
148+ // set the ref with a new ScheduledTask
149+ // reminder that ScheduledTask schedules itself, which is why we pass the executor
150+ heartbeatTaskRef .set (
151+ new ScheduledTask (conn .getScheduledExecutor (), alarmPeriodSettingNanos .get (), TimeUnit .NANOSECONDS ,
152+ () -> {
153+ long sinceLast = NatsSystemClock .nanoTime () - lastMsgReceivedNanoTime .get ();
154+ if (sinceLast > alarmPeriodSettingNanos .get ()) {
155+ handleHeartbeatError ();
156+ }
157+ })
158+ );
159+
160+ // since we just scheduled, reset this otherwise it may alarm too soon
155161 updateLastMessageReceived ();
156-
157- // replacement or new comes here
158- heartbeatTask .set (new ScheduledTask (
159- conn .getScheduledExecutor (),
160- alarmPeriodSettingNanos .get (), TimeUnit .NANOSECONDS ,
161- () -> {
162- long sinceLast = NatsSystemClock .nanoTime () - lastMsgReceivedNanoTime .get ();
163- if (sinceLast > alarmPeriodSettingNanos .get ()) {
164- updateLastMessageReceived (); // allow the system time to re-sub before alarming
165- handleHeartbeatError ();
166- }
167- }));
168-
169162 }
170163 finally {
171164 stateChangeLock .unlock ();
@@ -175,10 +168,10 @@ protected void initOrResetHeartbeatTimer() {
175168 protected void shutdownHeartbeatTimer () {
176169 stateChangeLock .lock ();
177170 try {
178- ScheduledTask hbTask = heartbeatTask .get ();
171+ ScheduledTask hbTask = heartbeatTaskRef .get ();
179172 if (hbTask != null ) {
180173 hbTask .shutdown ();
181- heartbeatTask .set (null );
174+ heartbeatTaskRef .set (null );
182175 }
183176 }
184177 finally {
0 commit comments