@@ -79,32 +79,41 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
7979 Duration .millis (unit .toMillis (initialDelay )), command , PendingCallableType .FIXED_DELAY ));
8080 }
8181
82- public void tick (long time , TimeUnit unit ) {
83- advanceTime (Duration .millis (unit .toMillis (time )));
84- }
85-
8682 /**
8783 * This will advance the reference time of the executor and execute (in the same thread) any
8884 * outstanding callable which execution time has passed.
8985 */
9086 public void advanceTime (Duration toAdvance ) {
9187 clock .advance (toAdvance .getMillis (), TimeUnit .MILLISECONDS );
88+ work ();
89+ }
90+
91+ private void work () {
9292 DateTime cmpTime = new DateTime (clock .millis ());
9393
94- synchronized (pendingCallables ) {
95- while (!pendingCallables .isEmpty ()
96- && pendingCallables .peek ().getScheduledTime ().compareTo (cmpTime ) <= 0 ) {
97- try {
98- pendingCallables .poll ().call ();
99- if (shutdown .get () && pendingCallables .isEmpty ()) {
100- pendingCallables .notifyAll ();
101- }
94+ for (;;) {
95+ PendingCallable <?> callable = null ;
96+ synchronized (pendingCallables ) {
97+ if (pendingCallables .isEmpty () || pendingCallables .peek ().getScheduledTime ().isAfter (cmpTime )) {
98+ break ;
99+ }
100+ callable = pendingCallables .poll ();
101+ }
102+ if (callable != null ) {
103+ try {
104+ callable .call ();
102105 } catch (Exception e ) {
103106 // We ignore any callable exception, which should be set to the future but not relevant to
104107 // advanceTime.
105108 }
106109 }
107110 }
111+
112+ synchronized (pendingCallables ) {
113+ if (shutdown .get () && pendingCallables .isEmpty ()) {
114+ pendingCallables .notifyAll ();
115+ }
116+ }
108117 }
109118
110119 @ Override
@@ -172,6 +181,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
172181 synchronized (pendingCallables ) {
173182 pendingCallables .add (callable );
174183 }
184+ work ();
175185 return callable .getScheduledFuture ();
176186 }
177187
0 commit comments