Skip to content

Commit acdf0c3

Browse files
committed
Ensure BuildOperationQueue will progress without extra leases
Fixes #37613
1 parent c5f4841 commit acdf0c3

2 files changed

Lines changed: 81 additions & 22 deletions

File tree

subprojects/core/src/main/java/org/gradle/internal/operations/DefaultBuildOperationQueue.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,8 @@ public void run() {
242242
private void runOperations() {
243243
CurrentBuildOperationRef.instance().with(parent, () -> {
244244
try {
245-
T operation;
246-
while ((operation = waitForNextOperation()) != null) {
247-
runBatch(operation);
245+
while (waitForNextOperation()) {
246+
runBatch();
248247
}
249248
} catch (Throwable t) {
250249
addFailure(t);
@@ -254,39 +253,38 @@ private void runOperations() {
254253
});
255254
}
256255

257-
@Nullable
258-
private T waitForNextOperation() {
256+
private boolean waitForNextOperation() {
259257
lock.lock();
260258
try {
261259
// If the token was already invalidated (e.g. in runBatch), exit immediately
262260
// to avoid becoming a zombie thread stuck in await().
263261
if (token != null && !token.isValid()) {
264-
return null;
262+
return false;
265263
}
266264
while (queueState == QueueState.Working && helper.isQueueEmpty()) {
267265
if (helper.isExtraWorker()) {
268266
// We should exit, immediately invalidate our token to ensure the count goes down now.
269267
invalidateIfNeeded();
270-
return null;
268+
return false;
271269
}
272270
try {
273271
workAvailable.await();
274272
} catch (InterruptedException e) {
275273
throw UncheckedException.throwAsUncheckedException(e);
276274
}
277275
}
278-
return helper.pollWork();
276+
return !helper.isQueueEmpty();
279277
} finally {
280278
lock.unlock();
281279
}
282280
}
283281

284-
private void runBatch(final T firstOperation) {
282+
private void runBatch() {
285283
int operationsExecuted;
286284
if (context.requiresWorkerLease()) {
287-
operationsExecuted = workerLeases.runAsWorkerThread(() -> executePendingWork(firstOperation));
285+
operationsExecuted = workerLeases.runAsWorkerThread(this::executePendingWork);
288286
} else {
289-
operationsExecuted = executePendingWork(firstOperation);
287+
operationsExecuted = executePendingWork();
290288
}
291289

292290
// We need to update pending count outside of withLocks() so that we don't have a race
@@ -295,9 +293,9 @@ private void runBatch(final T firstOperation) {
295293
completeOperations(operationsExecuted);
296294
}
297295

298-
private int executePendingWork(T firstOperation) {
296+
private int executePendingWork() {
299297
if (allowAccessToProjectState) {
300-
return doRunBatch(firstOperation);
298+
return doRunBatch();
301299
} else {
302300
// Disallow this thread from making any changes to the project locks while it is running the work. This implies that this thread will not
303301
// block waiting for access to some other project, which means it can proceed even if some other thread is waiting for a project lock it
@@ -309,26 +307,22 @@ private int executePendingWork(T firstOperation) {
309307
// constraint and then gradually roll this out to other worker threads, such as task action workers.
310308
//
311309
// See {@link ProjectLeaseRegistry#whileDisallowingProjectLockChanges} for more details
312-
return workerLeases.whileDisallowingProjectLockChanges(() -> doRunBatch(firstOperation));
310+
return workerLeases.whileDisallowingProjectLockChanges(this::doRunBatch);
313311
}
314312
}
315313

316314
/**
317315
* Run as much work as possible until the queue is empty or the queue is cancelled.
318316
* Then, we return and release the worker lease while we wait for more work to be added to the queue.
319317
*/
320-
private int doRunBatch(T firstOperation) {
318+
private int doRunBatch() {
321319
int operationCount = 0;
322-
T operation = firstOperation;
323-
while (operation != null) {
320+
while (true) {
324321
if (queueState == QueueState.Cancelled) {
325-
// If an operation was pulled from the queue, but the queue was cancelled before this operation could start
326-
// (for instance, because this worker was waiting on a worker lease) discard it without running.
327-
return ++operationCount;
322+
break;
328323
}
329-
runOperation(operation);
330-
operationCount++;
331324

325+
T operation;
332326
lock.lock();
333327
try {
334328
if (helper.isExtraWorker()) {
@@ -340,6 +334,14 @@ private int doRunBatch(T firstOperation) {
340334
} finally {
341335
lock.unlock();
342336
}
337+
338+
if (operation == null) {
339+
break;
340+
}
341+
342+
runOperation(operation);
343+
operationCount++;
344+
343345
}
344346
return operationCount;
345347
}

subprojects/core/src/test/groovy/org/gradle/internal/operations/DefaultBuildOperationQueueTest.groovy

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.gradle.internal.operations
1818

1919
import org.gradle.api.GradleException
20+
import org.gradle.internal.Factory
2021
import org.gradle.internal.concurrent.ExecutorPolicy
2122
import org.gradle.internal.concurrent.ManagedExecutor
2223
import org.gradle.internal.concurrent.ManagedExecutorImpl
@@ -27,7 +28,9 @@ import org.gradle.internal.work.DefaultWorkerLimits
2728
import org.gradle.internal.work.ResourceLockStatistics
2829
import org.gradle.internal.work.WorkerLeaseRegistry
2930
import org.gradle.internal.work.WorkerLeaseService
31+
import spock.lang.Issue
3032
import spock.lang.Specification
33+
import spock.lang.Timeout
3134

3235
import java.util.concurrent.CountDownLatch
3336
import java.util.concurrent.Executors
@@ -275,6 +278,60 @@ class DefaultBuildOperationQueueTest extends Specification {
275278
5 | 10
276279
}
277280
281+
@Issue("https://github.com/gradle/gradle/issues/37613")
282+
@Timeout(value = 30, unit = TimeUnit.SECONDS)
283+
def "workers do not pull operations without a lease, and main thread can progress the queue"() {
284+
given:
285+
// Slightly modified from setupQueue to allow certain injection points.
286+
def mainThread = Thread.currentThread()
287+
def workerAboutToBlockForLease = new CountDownLatch(1)
288+
def executedByMain = new AtomicInteger()
289+
def executedByOther = new AtomicInteger()
290+
def recordingWorker = { TestBuildOperation op ->
291+
if (Thread.currentThread() === mainThread) {
292+
executedByMain.incrementAndGet()
293+
} else {
294+
executedByOther.incrementAndGet()
295+
}
296+
op.run(null)
297+
} as BuildOperationQueue.QueueWorker<TestBuildOperation>
298+
299+
coordinationService = new DefaultResourceLockCoordinationService()
300+
workerRegistry = new DefaultWorkerLeaseService(coordinationService, new DefaultWorkerLimits(1), ResourceLockStatistics.NO_OP) {
301+
@Override
302+
<T> T runAsWorkerThread(Factory<T> action) {
303+
if (Thread.currentThread() !== mainThread) {
304+
workerAboutToBlockForLease.countDown()
305+
}
306+
return super.runAsWorkerThread(action)
307+
}
308+
}
309+
workerRegistry.startProjectExecution(true)
310+
// Keep the lease on the main thread so the spawned worker starves on runAsWorkerThread.
311+
lease = workerRegistry.startWorker()
312+
def executionContext = new BuildOperationExecutionContext(
313+
new ManagedExecutorImpl(Executors.newFixedThreadPool(1), new ExecutorPolicy.CatchAndRecordFailures()),
314+
1,
315+
true
316+
)
317+
operationQueue = new DefaultBuildOperationQueue(false, workerRegistry, executionContext, recordingWorker, null)
318+
319+
when:
320+
operationQueue.add(new Success())
321+
322+
and:
323+
// Wait until the worker is about to block on the lease.
324+
// This ensures that the main thread will be needed to progress the queue.
325+
assert workerAboutToBlockForLease.await(10, TimeUnit.SECONDS)
326+
327+
and:
328+
operationQueue.waitForCompletion()
329+
330+
then:
331+
executedByMain.get() + executedByOther.get() == 1
332+
executedByMain.get() == 1
333+
}
334+
278335
static class SynchronizedBuildOperation extends TestBuildOperation {
279336
final Runnable operationAction
280337
final CountDownLatch startedLatch

0 commit comments

Comments
 (0)