Skip to content

Commit de4d519

Browse files
Wyveraldcopybara-github
authored andcommitted
Block in repo fetching state's close() method
This greatly simplifies the code flow. Instead of using `volatile` and resorting to some very unsavory workarounds, we can simply make sure only one thread is changing `state.workerFuture` using plain old synchronization, and on memory pressure, make absolutely sure that the state object is cleaned up after we remove it from the central state cache. This goes against the advice introduced in 8ef0a51; the wording for `SkyKeyComputeState#close()` has been updated. Also changed the "retry on cancellation" logic from using recursion to using a `while`-loop for better clarity around nested `finally` blocks. Fixes #22393. PiperOrigin-RevId: 637975501 Change-Id: Ied43f0310ec8953f4ff1c2712fe07b8ccbd6c184
1 parent fa9c648 commit de4d519

File tree

4 files changed

+88
-110
lines changed

4 files changed

+88
-110
lines changed

src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.Executors;
3232
import java.util.concurrent.Semaphore;
3333
import javax.annotation.Nullable;
34+
import javax.annotation.concurrent.GuardedBy;
3435

3536
/**
3637
* Captures state that persists across different invocations of {@link
@@ -66,16 +67,16 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
6667
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
6768
* to tell whether a worker thread is already running.
6869
*/
69-
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
70-
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
71-
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
72-
// we might block in `close()`, which is potentially bad (see its javadoc).
73-
@Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;
70+
@GuardedBy("this")
71+
@Nullable
72+
private ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;
7473

7574
/** The executor service that manages the worker thread. */
7675
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
7776
// sure the worker thread has shut down (with its blocking `close()` method).
78-
ListeningExecutorService workerExecutorService;
77+
@GuardedBy("this")
78+
@Nullable
79+
private ListeningExecutorService workerExecutorService = null;
7980

8081
private final String repoName;
8182

@@ -89,19 +90,6 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
8990

9091
RepoFetchingSkyKeyComputeState(String repoName) {
9192
this.repoName = repoName;
92-
reset();
93-
}
94-
95-
// This may only be called from the host Skyframe thread, *and* only when no worker thread is
96-
// running.
97-
private void reset() {
98-
workerExecutorService =
99-
MoreExecutors.listeningDecorator(
100-
Executors.newThreadPerTaskExecutor(
101-
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
102-
signalSemaphore.drainPermits();
103-
delegateEnvQueue.clear();
104-
recordedInputValues.clear();
10593
}
10694

10795
/**
@@ -114,44 +102,48 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
114102
}
115103

116104
/**
117-
* Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
118-
* and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
119-
* successfully or otherwise. Returns the worker future. This may only be called from the host
105+
* Returns the worker future, or if a worker is not already running, starts a worker thread
106+
* running the given callable. This makes sure to release a permit on the {@code signalSemaphore}
107+
* when the worker finishes, successfully or otherwise. This may only be called from the host
120108
* Skyframe thread.
121109
*/
122-
ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
110+
synchronized ListenableFuture<RepositoryDirectoryValue.Builder> getOrStartWorker(
123111
Callable<RepositoryDirectoryValue.Builder> c) {
124-
var workerFuture = workerExecutorService.submit(c);
125-
this.workerFuture = workerFuture;
112+
if (workerFuture != null) {
113+
return workerFuture;
114+
}
115+
// We reset the state object back to its very initial state, since the host SkyFunction may have
116+
// been re-entered (for example b/330892334 and
117+
// https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have
118+
// been interrupted while the host SkyFunction was inactive.
119+
workerExecutorService =
120+
MoreExecutors.listeningDecorator(
121+
Executors.newThreadPerTaskExecutor(
122+
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
123+
signalSemaphore.drainPermits();
124+
delegateEnvQueue.clear();
125+
recordedInputValues.clear();
126+
127+
// Start the worker.
128+
workerFuture = workerExecutorService.submit(c);
126129
workerFuture.addListener(signalSemaphore::release, directExecutor());
127130
return workerFuture;
128131
}
129132

133+
/**
134+
* Closes the state object, and blocks until all pending async work is finished. The state object
135+
* will reset to a clean slate after this method finishes.
136+
*/
130137
// This may be called from any thread, including the host Skyframe thread and the
131138
// high-memory-pressure listener thread.
132139
@Override
133-
public void close() {
134-
var myWorkerFuture = workerFuture;
135-
workerFuture = null;
136-
if (myWorkerFuture != null) {
137-
myWorkerFuture.cancel(true);
140+
public synchronized void close() {
141+
if (workerFuture != null) {
142+
workerFuture.cancel(true);
138143
}
139-
workerExecutorService.shutdownNow();
140-
}
141-
142-
/**
143-
* Closes the state object, and blocks until all pending async work is finished. The state object
144-
* will reset to a clean slate after this method finishes. This may only be called from the host
145-
* Skyframe thread.
146-
*/
147-
public void closeAndWaitForTermination() throws InterruptedException {
148-
close();
149-
workerExecutorService.close(); // This blocks
150-
// We reset the state object back to its very initial state, since the host SkyFunction may be
151-
// re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238).
152-
reset();
153-
if (Thread.interrupted()) {
154-
throw new InterruptedException();
144+
workerFuture = null;
145+
if (workerExecutorService != null) {
146+
workerExecutorService.close(); // This blocks
155147
}
156148
}
157149
}

src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ class RepoFetchingWorkerSkyFunctionEnvironment
4444
private final RepoFetchingSkyKeyComputeState state;
4545
private SkyFunction.Environment delegate;
4646

47-
RepoFetchingWorkerSkyFunctionEnvironment(
48-
RepoFetchingSkyKeyComputeState state, SkyFunction.Environment delegate) {
47+
RepoFetchingWorkerSkyFunctionEnvironment(RepoFetchingSkyKeyComputeState state)
48+
throws InterruptedException {
4949
this.state = state;
50-
this.delegate = delegate;
50+
this.delegate = state.delegateEnvQueue.take();
5151
}
5252

5353
@Override

src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java

Lines changed: 45 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.ImmutableSet;
2323
import com.google.common.collect.Maps;
2424
import com.google.common.collect.Table;
25+
import com.google.common.util.concurrent.ListenableFuture;
2526
import com.google.devtools.build.lib.analysis.BlazeDirectories;
2627
import com.google.devtools.build.lib.analysis.RuleDefinition;
2728
import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride;
@@ -143,66 +144,51 @@ public RepositoryDirectoryValue.Builder fetch(
143144
if (!useWorkers) {
144145
return fetchInternal(args);
145146
}
146-
var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
147-
if (state.workerExecutorService.isShutdown()) {
148-
// If we get here and the worker executor is shut down, this can only mean that the worker
149-
// future was cancelled while we (the host Skyframe thread) were inactive (as in, having
150-
// returned `null` but not yet restarted). So we wait for the previous worker thread to finish
151-
// first.
152-
// TODO: instead of this complicated dance, consider making it legal for
153-
// `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51,
154-
// but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some
155-
// headache.
156-
state.closeAndWaitForTermination();
157-
}
158-
boolean shouldShutDownWorkerExecutorInFinally = true;
159-
try {
160-
var workerFuture = state.workerFuture;
161-
if (workerFuture == null) {
162-
// No worker is running yet, which means we're just starting to fetch this repo. Start with
163-
// a clean slate, and create the worker.
164-
setupRepoRoot(outputDirectory);
165-
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
166-
workerFuture =
167-
state.startWorker(
168-
() -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)));
169-
} else {
170-
// A worker is already running. This can only mean one thing -- we just had a Skyframe
171-
// restart, and need to send over a fresh Environment.
147+
// See below (the `catch CancellationException` clause) for why there's a `while` loop here.
148+
while (true) {
149+
var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
150+
ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture =
151+
state.getOrStartWorker(
152+
() -> {
153+
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state);
154+
setupRepoRoot(outputDirectory);
155+
return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues));
156+
});
157+
try {
172158
state.delegateEnvQueue.put(env);
173-
}
174-
state.signalSemaphore.acquire();
175-
if (!workerFuture.isDone()) {
176-
// This means that the worker is still running, and expecting a fresh Environment. Return
177-
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
178-
shouldShutDownWorkerExecutorInFinally = false;
179-
return null;
180-
}
181-
RepositoryDirectoryValue.Builder result = workerFuture.get();
182-
recordedInputValues.putAll(state.recordedInputValues);
183-
return result;
184-
} catch (ExecutionException e) {
185-
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
186-
Throwables.throwIfUnchecked(e.getCause());
187-
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause());
188-
} catch (CancellationException e) {
189-
// This can only happen if the state object was invalidated due to memory pressure, in
190-
// which case we can simply reattempt the fetch.
191-
env.getListener()
192-
.post(
193-
RepositoryFetchProgress.ongoing(
194-
RepositoryName.createUnvalidated(rule.getName()),
195-
"fetch interrupted due to memory pressure; restarting."));
196-
return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
197-
} finally {
198-
if (shouldShutDownWorkerExecutorInFinally) {
199-
// Unless we know the worker is waiting on a fresh Environment, we should *always* shut down
200-
// the worker executor and reset the state by the time we finish executing (successfully or
201-
// otherwise). This ensures that 1) no background work happens without our knowledge, and
202-
// 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and
203-
// https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing
204-
// things up.
205-
state.closeAndWaitForTermination();
159+
state.signalSemaphore.acquire();
160+
if (!workerFuture.isDone()) {
161+
// This means that the worker is still running, and expecting a fresh Environment. Return
162+
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
163+
return null;
164+
}
165+
RepositoryDirectoryValue.Builder result = workerFuture.get();
166+
recordedInputValues.putAll(state.recordedInputValues);
167+
return result;
168+
} catch (ExecutionException e) {
169+
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
170+
Throwables.throwIfUnchecked(e.getCause());
171+
throw new IllegalStateException(
172+
"unexpected exception type: " + e.getCause().getClass(), e.getCause());
173+
} catch (CancellationException e) {
174+
// This can only happen if the state object was invalidated due to memory pressure, in
175+
// which case we can simply reattempt the fetch. Show a message and continue into the next
176+
// `while` iteration.
177+
env.getListener()
178+
.post(
179+
RepositoryFetchProgress.ongoing(
180+
RepositoryName.createUnvalidated(rule.getName()),
181+
"fetch interrupted due to memory pressure; restarting."));
182+
} finally {
183+
if (workerFuture.isDone()) {
184+
// Unless we know the worker is waiting on a fresh Environment, we should *always* shut
185+
// down the worker executor by the time we finish executing (successfully or otherwise).
186+
// This ensures that 1) no background work happens without our knowledge, and 2) if the
187+
// SkyFunction is re-entered for any reason (for example b/330892334 and
188+
// https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new
189+
// worker from scratch.
190+
state.close();
191+
}
206192
}
207193
}
208194
}

src/main/java/com/google/devtools/build/skyframe/SkyFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,8 @@ interface SkyKeyComputeState extends AutoCloseable {
491491
*
492492
* <p>Implementations <strong>MUST</strong> be idempotent.
493493
*
494-
* <p>Note also that this method should not perform any heavy work (especially blocking
495-
* operations).
494+
* <p>Note also that this method could be invoked from arbitrary threads, so avoid heavy
495+
* operations if possible.
496496
*/
497497
@Override
498498
default void close() {}

0 commit comments

Comments
 (0)