|
22 | 22 | import com.google.common.collect.ImmutableSet; |
23 | 23 | import com.google.common.collect.Maps; |
24 | 24 | import com.google.common.collect.Table; |
| 25 | +import com.google.common.util.concurrent.ListenableFuture; |
25 | 26 | import com.google.devtools.build.lib.analysis.BlazeDirectories; |
26 | 27 | import com.google.devtools.build.lib.analysis.RuleDefinition; |
27 | 28 | import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride; |
@@ -143,66 +144,51 @@ public RepositoryDirectoryValue.Builder fetch( |
143 | 144 | if (!useWorkers) { |
144 | 145 | return fetchInternal(args); |
145 | 146 | } |
146 | | - var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); |
147 | | - if (state.workerExecutorService.isShutdown()) { |
148 | | - // If we get here and the worker executor is shut down, this can only mean that the worker |
149 | | - // future was cancelled while we (the host Skyframe thread) were inactive (as in, having |
150 | | - // returned `null` but not yet restarted). So we wait for the previous worker thread to finish |
151 | | - // first. |
152 | | - // TODO: instead of this complicated dance, consider making it legal for |
153 | | - // `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51, |
154 | | - // but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some |
155 | | - // headache. |
156 | | - state.closeAndWaitForTermination(); |
157 | | - } |
158 | | - boolean shouldShutDownWorkerExecutorInFinally = true; |
159 | | - try { |
160 | | - var workerFuture = state.workerFuture; |
161 | | - if (workerFuture == null) { |
162 | | - // No worker is running yet, which means we're just starting to fetch this repo. Start with |
163 | | - // a clean slate, and create the worker. |
164 | | - setupRepoRoot(outputDirectory); |
165 | | - Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env); |
166 | | - workerFuture = |
167 | | - state.startWorker( |
168 | | - () -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues))); |
169 | | - } else { |
170 | | - // A worker is already running. This can only mean one thing -- we just had a Skyframe |
171 | | - // restart, and need to send over a fresh Environment. |
| 147 | + // See below (the `catch CancellationException` clause) for why there's a `while` loop here. |
| 148 | + while (true) { |
| 149 | + var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); |
| 150 | + ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = |
| 151 | + state.getOrStartWorker( |
| 152 | + () -> { |
| 153 | + Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state); |
| 154 | + setupRepoRoot(outputDirectory); |
| 155 | + return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)); |
| 156 | + }); |
| 157 | + try { |
172 | 158 | state.delegateEnvQueue.put(env); |
173 | | - } |
174 | | - state.signalSemaphore.acquire(); |
175 | | - if (!workerFuture.isDone()) { |
176 | | - // This means that the worker is still running, and expecting a fresh Environment. Return |
177 | | - // null to trigger a Skyframe restart, but *don't* shut down the worker executor. |
178 | | - shouldShutDownWorkerExecutorInFinally = false; |
179 | | - return null; |
180 | | - } |
181 | | - RepositoryDirectoryValue.Builder result = workerFuture.get(); |
182 | | - recordedInputValues.putAll(state.recordedInputValues); |
183 | | - return result; |
184 | | - } catch (ExecutionException e) { |
185 | | - Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); |
186 | | - Throwables.throwIfUnchecked(e.getCause()); |
187 | | - throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause()); |
188 | | - } catch (CancellationException e) { |
189 | | - // This can only happen if the state object was invalidated due to memory pressure, in |
190 | | - // which case we can simply reattempt the fetch. |
191 | | - env.getListener() |
192 | | - .post( |
193 | | - RepositoryFetchProgress.ongoing( |
194 | | - RepositoryName.createUnvalidated(rule.getName()), |
195 | | - "fetch interrupted due to memory pressure; restarting.")); |
196 | | - return fetch(rule, outputDirectory, directories, env, recordedInputValues, key); |
197 | | - } finally { |
198 | | - if (shouldShutDownWorkerExecutorInFinally) { |
199 | | - // Unless we know the worker is waiting on a fresh Environment, we should *always* shut down |
200 | | - // the worker executor and reset the state by the time we finish executing (successfully or |
201 | | - // otherwise). This ensures that 1) no background work happens without our knowledge, and |
202 | | - // 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and |
203 | | - // https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing |
204 | | - // things up. |
205 | | - state.closeAndWaitForTermination(); |
| 159 | + state.signalSemaphore.acquire(); |
| 160 | + if (!workerFuture.isDone()) { |
| 161 | + // This means that the worker is still running, and expecting a fresh Environment. Return |
| 162 | + // null to trigger a Skyframe restart, but *don't* shut down the worker executor. |
| 163 | + return null; |
| 164 | + } |
| 165 | + RepositoryDirectoryValue.Builder result = workerFuture.get(); |
| 166 | + recordedInputValues.putAll(state.recordedInputValues); |
| 167 | + return result; |
| 168 | + } catch (ExecutionException e) { |
| 169 | + Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); |
| 170 | + Throwables.throwIfUnchecked(e.getCause()); |
| 171 | + throw new IllegalStateException( |
| 172 | + "unexpected exception type: " + e.getCause().getClass(), e.getCause()); |
| 173 | + } catch (CancellationException e) { |
| 174 | + // This can only happen if the state object was invalidated due to memory pressure, in |
| 175 | + // which case we can simply reattempt the fetch. Show a message and continue into the next |
| 176 | + // `while` iteration. |
| 177 | + env.getListener() |
| 178 | + .post( |
| 179 | + RepositoryFetchProgress.ongoing( |
| 180 | + RepositoryName.createUnvalidated(rule.getName()), |
| 181 | + "fetch interrupted due to memory pressure; restarting.")); |
| 182 | + } finally { |
| 183 | + if (workerFuture.isDone()) { |
| 184 | + // Unless we know the worker is waiting on a fresh Environment, we should *always* shut |
| 185 | + // down the worker executor by the time we finish executing (successfully or otherwise). |
| 186 | + // This ensures that 1) no background work happens without our knowledge, and 2) if the |
| 187 | + // SkyFunction is re-entered for any reason (for example b/330892334 and |
| 188 | + // https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new |
| 189 | + // worker from scratch. |
| 190 | + state.close(); |
| 191 | + } |
206 | 192 | } |
207 | 193 | } |
208 | 194 | } |
|
0 commit comments