1616
1717package com .google .cloud .pubsublite .internal ;
1818
19+ import com .google .common .util .concurrent .Monitor .Guard ;
1920import java .util .ArrayDeque ;
2021import java .util .Queue ;
2122import java .util .concurrent .Executor ;
22- import java .util .concurrent .atomic .AtomicBoolean ;
2323import javax .annotation .concurrent .GuardedBy ;
2424
2525/** An executor that runs tasks sequentially. */
2626public final class SerialExecutor implements AutoCloseable , Executor {
2727 private final Executor executor ;
28- private final AtomicBoolean isShutdown = new AtomicBoolean (false );
2928
30- @ GuardedBy ("this" )
29+ private final CloseableMonitor monitor = new CloseableMonitor ();
30+ private final Guard isInactive =
31+ new Guard (monitor .monitor ) {
32+ @ Override
33+ public boolean isSatisfied () {
34+ return !isTaskActive ;
35+ }
36+ };
37+
38+ @ GuardedBy ("monitor.monitor" )
3139 private final Queue <Runnable > tasks ;
3240
33- @ GuardedBy ("this " )
41+ @ GuardedBy ("monitor.monitor " )
3442 private boolean isTaskActive ;
3543
44+ @ GuardedBy ("monitor.monitor" )
45+ private boolean isShutdown ;
46+
3647 public SerialExecutor (Executor executor ) {
3748 this .executor = executor ;
3849 this .tasks = new ArrayDeque <>();
3950 this .isTaskActive = false ;
51+ this .isShutdown = false ;
52+ }
53+
54+ /** Waits until there are no active tasks. */
55+ public void waitUntilInactive () {
56+ try (CloseableMonitor .Hold h = monitor .enterWhenUninterruptibly (isInactive )) {}
4057 }
4158
4259 /**
@@ -45,34 +62,45 @@ public SerialExecutor(Executor executor) {
4562 */
4663 @ Override
4764 public void close () {
48- isShutdown .set (true );
65+ try (CloseableMonitor .Hold h = monitor .enter ()) {
66+ isShutdown = true ;
67+ }
4968 }
5069
5170 @ Override
52- public synchronized void execute (Runnable r ) {
53- if (isShutdown .get ()) {
54- return ;
71+ public void execute (Runnable r ) {
72+ try (CloseableMonitor .Hold h = monitor .enter ()) {
73+ if (isShutdown ) {
74+ return ;
75+ }
76+ tasks .add (
77+ () -> {
78+ try {
79+ if (shouldExecuteTask ()) {
80+ r .run ();
81+ }
82+ } finally {
83+ scheduleNextTask ();
84+ }
85+ });
86+ if (!isTaskActive ) {
87+ scheduleNextTask ();
88+ }
5589 }
56- tasks .add (
57- () -> {
58- if (isShutdown .get ()) {
59- return ;
60- }
61- try {
62- r .run ();
63- } finally {
64- scheduleNextTask ();
65- }
66- });
67- if (!isTaskActive ) {
68- scheduleNextTask ();
90+ }
91+
92+ private boolean shouldExecuteTask () {
93+ try (CloseableMonitor .Hold h = monitor .enter ()) {
94+ return !isShutdown ;
6995 }
7096 }
7197
72- private synchronized void scheduleNextTask () {
73- isTaskActive = !tasks .isEmpty ();
74- if (isTaskActive ) {
75- executor .execute (tasks .poll ());
98+ private void scheduleNextTask () {
99+ try (CloseableMonitor .Hold h = monitor .enter ()) {
100+ isTaskActive = !tasks .isEmpty () && !isShutdown ;
101+ if (isTaskActive ) {
102+ executor .execute (tasks .poll ());
103+ }
76104 }
77105 }
78106}
0 commit comments