Skip to content

Commit 1b17096

Browse files
committed
Self review
1 parent fbd0d51 commit 1b17096

2 files changed

Lines changed: 28 additions & 29 deletions

File tree

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
4747
protected final AtomicBoolean hb;
4848
protected final AtomicLong idleHeartbeatSettingMillis;
4949
protected final AtomicLong alarmPeriodSettingNanos;
50-
protected final AtomicReference<ScheduledTask> heartbeatTask;
50+
protected final AtomicReference<ScheduledTask> heartbeatTaskRef;
5151

5252
protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
5353
stateChangeLock = new ReentrantLock();
@@ -62,7 +62,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
6262
idleHeartbeatSettingMillis = new AtomicLong();
6363
alarmPeriodSettingNanos = new AtomicLong();
6464
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
65-
heartbeatTask = new AtomicReference<>();
65+
heartbeatTaskRef = new AtomicReference<>();
6666
}
6767

6868
protected boolean isSyncMode() { return syncMode; }
@@ -139,33 +139,26 @@ protected void updateLastMessageReceived() {
139139
protected void initOrResetHeartbeatTimer() {
140140
stateChangeLock.lock();
141141
try {
142-
ScheduledTask hbTask = heartbeatTask.get();
142+
ScheduledTask hbTask = heartbeatTaskRef.get();
143143
if (hbTask != null) {
144-
// Same settings, just reuse the existing scheduled task
145-
if (hbTask.getPeriodNanos() == alarmPeriodSettingNanos.get()) {
146-
updateLastMessageReceived();
147-
return;
148-
}
149-
150-
// Replace timer since settings have changed
144+
// we always want a fresh schedule because it will have the initial delay
151145
hbTask.shutdown();
152146
}
153147

154-
// so the alarm doesn't trigger too soon
148+
// set the ref with a new ScheduledTask
149+
// reminder that ScheduledTask schedules itself, which is why we pass the executor
150+
heartbeatTaskRef.set(
151+
new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
152+
() -> {
153+
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
154+
if (sinceLast > alarmPeriodSettingNanos.get()) {
155+
handleHeartbeatError();
156+
}
157+
})
158+
);
159+
160+
// since we just scheduled, reset this otherwise it may alarm too soon
155161
updateLastMessageReceived();
156-
157-
// replacement or new comes here
158-
heartbeatTask.set(new ScheduledTask(
159-
conn.getScheduledExecutor(),
160-
alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
161-
() -> {
162-
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
163-
if (sinceLast > alarmPeriodSettingNanos.get()) {
164-
updateLastMessageReceived(); // allow the system time to re-sub before alarming
165-
handleHeartbeatError();
166-
}
167-
}));
168-
169162
}
170163
finally {
171164
stateChangeLock.unlock();
@@ -175,10 +168,10 @@ protected void initOrResetHeartbeatTimer() {
175168
protected void shutdownHeartbeatTimer() {
176169
stateChangeLock.lock();
177170
try {
178-
ScheduledTask hbTask = heartbeatTask.get();
171+
ScheduledTask hbTask = heartbeatTaskRef.get();
179172
if (hbTask != null) {
180173
hbTask.shutdown();
181-
heartbeatTask.set(null);
174+
heartbeatTaskRef.set(null);
182175
}
183176
}
184177
finally {

src/test/java/io/nats/client/support/ScheduledTaskTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ public void testScheduledTask() throws InterruptedException {
2020
AtomicInteger counter400 = new AtomicInteger();
2121
SttRunnable sttr400 = new SttRunnable(100, counter400);
2222
ScheduledTask task400 = new ScheduledTask(stpe, 0, 400, TimeUnit.MILLISECONDS, sttr400);
23-
assertEquals(TimeUnit.MILLISECONDS.toNanos(400), task400.getPeriodNanos());
23+
validateTaskPeriods(task400, 0, 400);
2424

2525
AtomicInteger counter200 = new AtomicInteger();
2626
SttRunnable sttr200 = new SttRunnable(300, counter200);
2727
ScheduledTask task200 = new ScheduledTask(stpe, 0, 200, TimeUnit.MILLISECONDS, sttr200);
28-
assertEquals(TimeUnit.MILLISECONDS.toNanos(200), task200.getPeriodNanos());
28+
validateTaskPeriods(task200, 0, 200);
2929

3030
AtomicInteger counter100 = new AtomicInteger();
3131
SttRunnable sttr100 = new SttRunnable(400, counter100);
3232
ScheduledTask task100 = new ScheduledTask(stpe, 0, 100, TimeUnit.MILLISECONDS, sttr100);
33-
assertEquals(TimeUnit.MILLISECONDS.toNanos(100), task100.getPeriodNanos());
33+
validateTaskPeriods(task100, 0, 100);
3434

3535
validateState(task400, false, false, null);
3636
validateState(task200, false, false, null);
@@ -56,6 +56,12 @@ public void testScheduledTask() throws InterruptedException {
5656
assertTrue(counter100.get() >= 3);
5757
}
5858

59+
@SuppressWarnings("SameParameterValue")
60+
private static void validateTaskPeriods(ScheduledTask task, long expectedDelay, long expectedPeriod) {
61+
assertEquals(TimeUnit.MILLISECONDS.toNanos(expectedDelay), task.getInitialDelayNanos());
62+
assertEquals(TimeUnit.MILLISECONDS.toNanos(expectedPeriod), task.getPeriodNanos());
63+
}
64+
5965
static void validateState(ScheduledTask task, boolean shutdown, boolean done, Boolean executing) {
6066
assertEquals(shutdown, task.isShutdown());
6167
assertEquals(done, task.isDone());

0 commit comments

Comments
 (0)