Skip to content

Commit c13227a

Browse files
committed
timer / scheduler improvements: reduce scheduled events, correct lag time calculations if timer event is before previous
1 parent f7828d4 commit c13227a

File tree

8 files changed

+303
-35
lines changed

8 files changed

+303
-35
lines changed

cache2k-core/src/main/java/org/cache2k/core/timing/DefaultTimer.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import java.util.concurrent.locks.ReentrantLock;
3232

3333
/**
34-
* Standard timer implementation. Due timer tasks are executed via a scheduler
35-
* that runs at most every second (lag time, configurable). There is always only
36-
* one pending scheduler job per timer.
34+
* Standard timer implementation. Timer tasks are executed via a scheduler
35+
* that fires at more approximately at second intervals (lag time, configurable).
36+
* Typically, there is only one scheduler task per timer. In the case that a timer
37+
* task is scheduled more than one second before the last an earlier scheduler
38+
* event is inserted. The later scheduler event is not needed any more, but we
39+
* do not delete scheduler events in this case.
3740
*
3841
* @author Jens Wilke
3942
*/
@@ -54,11 +57,11 @@ public class DefaultTimer implements Timer {
5457
private final TimeReference clock;
5558
private final Scheduler scheduler;
5659
private final TimerStructure structure;
57-
private long nextScheduled = Long.MAX_VALUE;
5860
/**
5961
* Lag time to gather timer tasks for more efficient execution.
6062
*/
6163
private final long lagTicks;
64+
private long nextScheduled = Long.MAX_VALUE;
6265

6366
private final Runnable timerAction = new Runnable() {
6467
@Override
@@ -100,8 +103,9 @@ public void schedule(TimerTask task, long time) {
100103
}
101104
lock.lock();
102105
try {
103-
if (structure.schedule(task, time)) {
104-
rescheduleEventually(time + lagTicks);
106+
long slotTime = structure.schedule(task, time);
107+
if (slotTime != 0) {
108+
rescheduleEventually(slotTime);
105109
return;
106110
}
107111
executeImmediately(task);
@@ -200,12 +204,11 @@ private void timeReachedEvent(long currentTime) {
200204
*
201205
* @param now the current time for calculations
202206
* @param time requested time for processing, or MAX_VALUE if nothing needs to be scheduled
203-
* @throw CacheClosedException
207+
* @throws CacheClosedException if cache was closed concurrently
204208
*/
205209
private void schedule(long now, long time) {
206210
if (time != Long.MAX_VALUE) {
207-
long earliestTime = now + lagTicks;
208-
scheduleNext(Math.max(earliestTime, time));
211+
scheduleNext(time);
209212
} else {
210213
nextScheduled = Long.MAX_VALUE;
211214
}
@@ -217,7 +220,7 @@ private void schedule(long now, long time) {
217220
* We don't cancel a scheduled task. The additional event does not hurt.
218221
*/
219222
void rescheduleEventually(long time) {
220-
if (time >= nextScheduled - lagTicks) {
223+
if (time >= nextScheduled) {
221224
return;
222225
}
223226
scheduleNext(time);

cache2k-core/src/main/java/org/cache2k/core/timing/TimerStructure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ public interface TimerStructure {
3838
* Insert task. Scheduling might be not possible if tasks for the requested
3939
* time have already be run.
4040
*
41-
* @return true if scheduled successfully, false if scheduling was not possible
42-
* because the target time slot would be in the past
41+
* @return 0, if time is already in the past. Or a positive value
42+
* indicating the execution time of the timer slot
4343
*/
44-
boolean schedule(TimerTask task, long time);
44+
long schedule(TimerTask task, long time);
4545

4646
/**
4747
* Cancel all tasks

cache2k-core/src/main/java/org/cache2k/core/timing/TimerWheels.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,21 @@
3232
public class TimerWheels implements TimerStructure {
3333

3434
private final Wheel wheel;
35+
private final long delta;
3536

3637
public TimerWheels(long startTime, long delta, int slots) {
38+
this.delta = delta;
3739
wheel = new Wheel(startTime, delta, slots);
3840
}
3941

40-
public boolean schedule(TimerTask task, long time) {
42+
public long schedule(TimerTask task, long time) {
4143
task.time = time;
42-
return wheel.schedule(task);
44+
if (wheel.schedule(task)) {
45+
long slotTime = time + (delta - ((time - wheel.noon) % delta)) - 1;
46+
if (slotTime < 0) { return Long.MAX_VALUE - 1; }
47+
return slotTime;
48+
}
49+
return 0;
4350
}
4451

4552
public void cancelAll() {
@@ -55,17 +62,17 @@ public long nextRun() {
5562
return wheel.nextToRun();
5663
}
5764

58-
static class Wheel {
65+
class Wheel {
5966

6067
private Wheel up;
6168
private long noon;
6269
private long nextNoon;
63-
private final long delta;
70+
private final long slotDelta;
6471
private final TimerTask[] slots;
6572
private int index;
6673

67-
Wheel(long time, long delta, int slotCount) {
68-
this.delta = delta;
74+
Wheel(long time, long slotDelta, int slotCount) {
75+
this.slotDelta = slotDelta;
6976
slots = new TimerTask[slotCount];
7077
initArray();
7178
atNoon(time);
@@ -85,7 +92,7 @@ private void initArray() {
8592
private void atNoon(long time) {
8693
index = 0;
8794
noon = time;
88-
nextNoon = time + delta * slots.length;
95+
nextNoon = time + slotDelta * slots.length;
8996
if (nextNoon < 0) {
9097
nextNoon = Long.MAX_VALUE;
9198
}
@@ -101,16 +108,19 @@ private void cancel() {
101108

102109
/**
103110
* Time, when all tasks for the given slot index can be executed.
111+
* Or, if this is not the lowest level wheel, the time when potentially
112+
* this slot might have tasks to execute. We add the global delta, so
113+
* this works for all clock levels.
104114
*/
105115
long executionTime(int i) {
106-
return noon + delta * i + delta - 1;
116+
return noon + slotDelta * i + delta - 1;
107117
}
108118

109119
/**
110-
* Search for non empty time slot and return the time, when
111-
* the slot can be executed. For simplicity we don't recurse
112-
* into higher hierarchies, so this method is only called
113-
* at the lowest hierarchy.
120+
* Search for occupied time slot and return the time, when
121+
* the slot can be executed. If the lowest level does not have
122+
* tasks to execute, recurse up and return the time when the slot
123+
* at an upper level needs to split up.
114124
*/
115125
long nextToRun() {
116126
for (int i = index; i < slots.length; i++) {
@@ -121,7 +131,8 @@ long nextToRun() {
121131
if (up == null) {
122132
return Long.MAX_VALUE;
123133
}
124-
return executionTime(slots.length);
134+
long time = up.nextToRun();
135+
return time;
125136
}
126137

127138
/**
@@ -140,7 +151,7 @@ public TimerTask removeNextToRun(long time) {
140151
t.remove();
141152
return t;
142153
}
143-
hand = hand + delta;
154+
hand = hand + slotDelta;
144155
if (time >= hand) {
145156
moveHand();
146157
continue;
@@ -191,7 +202,7 @@ private boolean schedule(TimerTask t) {
191202
return true;
192203
} else {
193204
if (up == null) {
194-
up = new Wheel(nextNoon, delta * slots.length, slots.length);
205+
up = new Wheel(nextNoon, slotDelta * slots.length, slots.length);
195206
}
196207
return up.schedule(t);
197208
}
@@ -201,7 +212,7 @@ private boolean schedule(TimerTask t) {
201212
* Insert into the proper time slot.
202213
*/
203214
private void insert(TimerTask t) {
204-
int idx = (int) ((t.time - noon) / delta);
215+
int idx = (int) ((t.time - noon) / slotDelta);
205216
slots[idx].insert(t);
206217
}
207218

cache2k-pinpoint/src/main/java/org/cache2k/pinpoint/ExceptionCollector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ public void exception(Throwable t) {
4545
}
4646
}
4747

48+
/**
49+
* Execute the runnable and collect exception if thrown
50+
*/
51+
public void runAndCatch(Runnable r) {
52+
try {
53+
r.run();
54+
} catch (Throwable t) {
55+
exception(t);
56+
}
57+
}
58+
4859
public void assertNoException() {
4960
if (firstException.get() != null) {
5061
throw new AssertionError(

cache2k-testing/src/main/java/org/cache2k/testing/SimulatedClock.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ public void sleep(long ticks) throws InterruptedException {
184184
sleepInExecutor(ticks);
185185
return;
186186
}
187+
if (ticks < 0) {
188+
waitForTaskExecution();
189+
return;
190+
}
187191
if (ticks == 0) {
188192
sleep0();
189193
return;
@@ -210,11 +214,7 @@ private void sleepInExecutor(long ticks) throws InterruptedException {
210214
*/
211215
private void sleep0() throws InterruptedException {
212216
if (tasksWaitingForExecution.get() > 0) {
213-
while (tasksWaitingForExecution.get() > 0) {
214-
synchronized (parallelExecutionsWaiter) {
215-
parallelExecutionsWaiter.wait(5);
216-
}
217-
}
217+
waitForTaskExecution();
218218
return;
219219
}
220220
long nextTime = progressAndRunEvents(-1);
@@ -225,6 +225,14 @@ private void sleep0() throws InterruptedException {
225225
moveForward(rawTicks() + 1);
226226
}
227227

228+
private void waitForTaskExecution() throws InterruptedException {
229+
while (tasksWaitingForExecution.get() > 0) {
230+
synchronized (parallelExecutionsWaiter) {
231+
parallelExecutionsWaiter.wait(5);
232+
}
233+
}
234+
}
235+
228236
public Executor wrapExecutor(Executor ex) {
229237
return new WrappedExecutor(ex);
230238
}

0 commit comments

Comments
 (0)