Skip to content

Commit dcda51e

Browse files
yuyue730copybara-github
authored andcommitted
Add a new skyframeGlobsExecutorService ForkJoinPool to AnalysisExecutor
In this change, I intentionally set default flag value (aka thread number) in `PackageOptions` to be 0, so this change will be a no-op for now. In the future, this FJP will be injected into `GlobsFunction` so that in-parallel `StateMachine`s are pushed to be run on this FJP concurrently. A positive default flag value (thread count) should be determined at that time. I also have considered using an existing FJP (`cpuHeavyPoolExecutorService`) instead of creating a new one. And eventually chose to create a new one. There are several reasons for this decision: 1. A new FJP with a limited number of threads only adds a very small resource usage to blaze, which is acceptable. 2. Reusing `cpuHeavyPoolExecutorService` requires heavy renaming (especially we will need to change the flag name), which should be avoided. 3. If some logic ends of blocking the threads in the same thread pool I try to add threads to, a deadlock can happen. PiperOrigin-RevId: 607732468 Change-Id: Ib6226a545f7efdc32c3213cc74425cca75f3e508
1 parent 8e0343c commit dcda51e

File tree

5 files changed

+119
-32
lines changed

5 files changed

+119
-32
lines changed

src/main/java/com/google/devtools/build/lib/concurrent/MultiExecutorQueueVisitor.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public final class MultiExecutorQueueVisitor extends AbstractQueueVisitor
3939
implements MultiThreadPoolsQuiescingExecutor {
4040
private final ExecutorService regularPoolExecutorService;
41-
private final ExecutorService cpuHeavyPoolExecutorService;
41+
@Nullable private final ExecutorService cpuHeavyPoolExecutorService;
42+
@Nullable private final ExecutorService skyframeGlobsExecutorService;
4243
@Nullable private final ExecutorService executionPhaseExecutorService;
4344

4445
// Whether execution phase tasks should be allowed to move forward.
@@ -50,7 +51,8 @@ public final class MultiExecutorQueueVisitor extends AbstractQueueVisitor
5051

5152
private MultiExecutorQueueVisitor(
5253
ExecutorService regularPoolExecutorService,
53-
ExecutorService cpuHeavyPoolExecutorService,
54+
@Nullable ExecutorService skyframeGlobsExecutorService,
55+
@Nullable ExecutorService cpuHeavyPoolExecutorService,
5456
@Nullable ExecutorService executionPhaseExecutorService,
5557
ExceptionHandlingMode exceptionHandlingMode,
5658
ErrorClassifier errorClassifier) {
@@ -60,7 +62,8 @@ private MultiExecutorQueueVisitor(
6062
exceptionHandlingMode,
6163
errorClassifier);
6264
this.regularPoolExecutorService = super.getExecutorService();
63-
this.cpuHeavyPoolExecutorService = Preconditions.checkNotNull(cpuHeavyPoolExecutorService);
65+
this.skyframeGlobsExecutorService = skyframeGlobsExecutorService;
66+
this.cpuHeavyPoolExecutorService = cpuHeavyPoolExecutorService;
6467
this.executionPhaseExecutorService = executionPhaseExecutorService;
6568
this.executionPhaseTasksGoAhead = executionPhaseExecutorService == null;
6669

@@ -71,25 +74,14 @@ private MultiExecutorQueueVisitor(
7174

7275
public static MultiExecutorQueueVisitor createWithExecutorServices(
7376
ExecutorService regularPoolExecutorService,
74-
ExecutorService cpuHeavyPoolExecutorService,
75-
ExceptionHandlingMode exceptionHandlingMode,
76-
ErrorClassifier errorClassifier) {
77-
return createWithExecutorServices(
78-
regularPoolExecutorService,
79-
cpuHeavyPoolExecutorService,
80-
/* executionPhaseExecutorService= */ null,
81-
exceptionHandlingMode,
82-
errorClassifier);
83-
}
84-
85-
public static MultiExecutorQueueVisitor createWithExecutorServices(
86-
ExecutorService regularPoolExecutorService,
87-
ExecutorService cpuHeavyPoolExecutorService,
88-
ExecutorService executionPhaseExecutorService,
77+
@Nullable ExecutorService skyframeGlobsExecutorService,
78+
@Nullable ExecutorService cpuHeavyPoolExecutorService,
79+
@Nullable ExecutorService executionPhaseExecutorService,
8980
ExceptionHandlingMode exceptionHandlingMode,
9081
ErrorClassifier errorClassifier) {
9182
return new MultiExecutorQueueVisitor(
9283
regularPoolExecutorService,
84+
skyframeGlobsExecutorService,
9385
cpuHeavyPoolExecutorService,
9486
executionPhaseExecutorService,
9587
exceptionHandlingMode,
@@ -116,7 +108,11 @@ ExecutorService getExecutorServiceByThreadPoolType(ThreadPoolType threadPoolType
116108
case REGULAR:
117109
return regularPoolExecutorService;
118110
case CPU_HEAVY:
111+
Preconditions.checkNotNull(cpuHeavyPoolExecutorService);
119112
return cpuHeavyPoolExecutorService;
113+
case SKYFRAME_GLOBS:
114+
Preconditions.checkNotNull(skyframeGlobsExecutorService);
115+
return skyframeGlobsExecutorService;
120116
case EXECUTION_PHASE:
121117
Preconditions.checkNotNull(executionPhaseExecutorService);
122118
return executionPhaseExecutorService;
@@ -130,7 +126,12 @@ protected void shutdownExecutorService(Throwable catastrophe) {
130126
Throwables.throwIfUnchecked(catastrophe);
131127
}
132128
internalShutdownExecutorService(regularPoolExecutorService);
133-
internalShutdownExecutorService(cpuHeavyPoolExecutorService);
129+
if (skyframeGlobsExecutorService != null) {
130+
internalShutdownExecutorService(skyframeGlobsExecutorService);
131+
}
132+
if (cpuHeavyPoolExecutorService != null) {
133+
internalShutdownExecutorService(cpuHeavyPoolExecutorService);
134+
}
134135
if (executionPhaseExecutorService != null) {
135136
internalShutdownExecutorService(executionPhaseExecutorService);
136137
}

src/main/java/com/google/devtools/build/lib/concurrent/MultiThreadPoolsQuiescingExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public interface MultiThreadPoolsQuiescingExecutor extends QuiescingExecutor {
1919
enum ThreadPoolType {
2020
// Suitable for CPU-heavy tasks. Ideally the number of threads is close to the machine's number
2121
// of cores.
22+
SKYFRAME_GLOBS,
2223
CPU_HEAVY,
2324
// Reserved for execution-phase tasks.
2425
EXECUTION_PHASE,

src/main/java/com/google/devtools/build/lib/pkgcache/PackageOptions.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public ParallelismConverter() throws OptionsParsingException {
149149
documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
150150
effectTags = {OptionEffectTag.UNKNOWN},
151151
help =
152-
"Number of threads to use for glob evaluation. Takes "
152+
"Number of threads to use for non-Skyframe glob evaluation. Takes "
153153
+ ResourceConverter.FLAG_SYNTAX
154154
+ ". \"auto\" means to use a reasonable value derived from the machine's hardware"
155155
+ " profile (e.g. the number of processors).")
@@ -167,6 +167,19 @@ public ParallelismConverter() throws OptionsParsingException {
167167
)
168168
public int maxDirectoriesToEagerlyVisitInGlobbing;
169169

170+
// TODO(b/290998109): Set the default value to a postive number and converter field to
171+
// ParallelismConverter.class when GlobsFunction in-parallel StateMachine work is done.
172+
@Option(
173+
name = "skyframe_globs_threads",
174+
defaultValue = "0",
175+
documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
176+
effectTags = {OptionEffectTag.UNKNOWN},
177+
help =
178+
"Number of threads to use for skyframe GlobsFunction's StateMachine in-parallel"
179+
+ " computation. GlobsFunction creates multiple independent StateMachines to compute"
180+
+ " each individual package glob expression in-parallel.")
181+
public int skyframeGlobsThreads;
182+
170183
@Option(
171184
name = "fetch",
172185
defaultValue = "true",

src/main/java/com/google/devtools/build/lib/runtime/QuiescingExecutorsImpl.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
*/
3737
public final class QuiescingExecutorsImpl implements QuiescingExecutors {
3838
private static final String SKYFRAME_EVALUATOR = "skyframe-evaluator";
39+
private static final String SKYFRAME_EVALUATOR_GLOBS = "skyframe-evaluator-globs";
3940
private static final String SKYFRAME_EVALUATOR_CPU_HEAVY = "skyframe-evaluator-cpu-heavy";
4041
private static final String SKYFRAME_EVALUATOR_EXECUTION = "skyframe-evaluator-execution";
4142

4243
private int analysisParallelism;
4344
private int executionParallelism;
4445
private int globbingParallelism;
46+
private int skyframeGlobsParallelism;
4547

4648
/**
4749
* The size of the thread pool for CPU-heavy tasks set by
@@ -58,6 +60,7 @@ public static QuiescingExecutors forTesting() {
5860
/* analysisParallelism= */ 6,
5961
/* executionParallelism= */ 6,
6062
/* globbingParallelism= */ 6,
63+
/* skyframeGlobsParallelism= */ 4,
6164
/* cpuHeavySkyKeysThreadPoolSize= */ 4);
6265
}
6366

@@ -66,17 +69,20 @@ static QuiescingExecutorsImpl createDefault() {
6669
/* analysisParallelism= */ 0,
6770
/* executionParallelism= */ 0,
6871
/* globbingParallelism= */ 0,
72+
/* skyframeGlobsParallelism= */ 0,
6973
/* cpuHeavySkyKeysThreadPoolSize= */ 0);
7074
}
7175

7276
private QuiescingExecutorsImpl(
7377
int analysisParallelism,
7478
int executionParallelism,
7579
int globbingParallelism,
80+
int skyframeGlobsParallelism,
7681
int cpuHeavySkyKeysThreadPoolSize) {
7782
this.analysisParallelism = analysisParallelism;
7883
this.executionParallelism = executionParallelism;
7984
this.globbingParallelism = globbingParallelism;
85+
this.skyframeGlobsParallelism = skyframeGlobsParallelism;
8086
this.cpuHeavySkyKeysThreadPoolSize = cpuHeavySkyKeysThreadPoolSize;
8187
}
8288

@@ -95,6 +101,8 @@ void resetParameters(OptionsProvider options) {
95101
this.executionParallelism = buildRequestOptions != null ? buildRequestOptions.jobs : 0;
96102
var packageOptions = options.getOptions(PackageOptions.class);
97103
this.globbingParallelism = packageOptions != null ? packageOptions.globbingThreads : 0;
104+
this.skyframeGlobsParallelism =
105+
packageOptions != null ? packageOptions.skyframeGlobsThreads : 0;
98106
var analysisOptions = options.getOptions(AnalysisOptions.class);
99107
this.cpuHeavySkyKeysThreadPoolSize =
100108
analysisOptions != null ? analysisOptions.cpuHeavySkyKeysThreadPoolSize : 0;
@@ -118,16 +126,24 @@ public int globbingParallelism() {
118126
@Override
119127
public QuiescingExecutor getAnalysisExecutor() {
120128
checkState(analysisParallelism > 0, "expected analysisParallelism > 0 : %s", this);
121-
if (cpuHeavySkyKeysThreadPoolSize > 0) {
129+
if (skyframeGlobsParallelism > 0 || cpuHeavySkyKeysThreadPoolSize > 0) {
122130
return MultiExecutorQueueVisitor.createWithExecutorServices(
123131
newNamedPool(SKYFRAME_EVALUATOR, analysisParallelism),
124-
AbstractQueueVisitor.createExecutorService(
125-
/* parallelism= */ cpuHeavySkyKeysThreadPoolSize, SKYFRAME_EVALUATOR_CPU_HEAVY),
132+
skyframeGlobsParallelism > 0
133+
? AbstractQueueVisitor.createExecutorService(
134+
/* parallelism= */ skyframeGlobsParallelism, SKYFRAME_EVALUATOR_GLOBS)
135+
: null,
136+
cpuHeavySkyKeysThreadPoolSize > 0
137+
? AbstractQueueVisitor.createExecutorService(
138+
/* parallelism= */ cpuHeavySkyKeysThreadPoolSize, SKYFRAME_EVALUATOR_CPU_HEAVY)
139+
: null,
140+
/* executionPhaseExecutorService= */ null,
126141
ExceptionHandlingMode.FAIL_FAST,
127142
ParallelEvaluatorErrorClassifier.instance());
143+
} else {
144+
return AbstractQueueVisitor.create(
145+
SKYFRAME_EVALUATOR, analysisParallelism(), ParallelEvaluatorErrorClassifier.instance());
128146
}
129-
return AbstractQueueVisitor.create(
130-
SKYFRAME_EVALUATOR, analysisParallelism(), ParallelEvaluatorErrorClassifier.instance());
131147
}
132148

133149
@Override
@@ -147,6 +163,10 @@ public QuiescingExecutor getMergedAnalysisAndExecutionExecutor() {
147163
cpuHeavySkyKeysThreadPoolSize > 0, "expected cpuHeavySkyKeysThreadPoolSize > 0 : %s", this);
148164
return MultiExecutorQueueVisitor.createWithExecutorServices(
149165
newNamedPool(SKYFRAME_EVALUATOR, analysisParallelism),
166+
skyframeGlobsParallelism > 0
167+
? AbstractQueueVisitor.createExecutorService(
168+
/* parallelism= */ skyframeGlobsParallelism, SKYFRAME_EVALUATOR_GLOBS)
169+
: null,
150170
AbstractQueueVisitor.createExecutorService(
151171
/* parallelism= */ cpuHeavySkyKeysThreadPoolSize, SKYFRAME_EVALUATOR_CPU_HEAVY),
152172
AbstractQueueVisitor.createExecutorService(

src/test/java/com/google/devtools/build/lib/concurrent/MultiExecutorQueueVisitorTest.java

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,55 @@ public class MultiExecutorQueueVisitorTest {
3131
@Test
3232
public void testGetExecutorServiceByThreadPoolType_regular() {
3333
ExecutorService regular = mock(ExecutorService.class);
34+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
3435
ExecutorService cpuHeavy = mock(ExecutorService.class);
3536

3637
MultiExecutorQueueVisitor queueVisitor =
3738
MultiExecutorQueueVisitor.createWithExecutorServices(
38-
regular, cpuHeavy, ExceptionHandlingMode.KEEP_GOING, ErrorClassifier.DEFAULT);
39+
regular,
40+
skyframeGlobs,
41+
cpuHeavy,
42+
/* executionPhaseExecutorService= */ null,
43+
ExceptionHandlingMode.KEEP_GOING,
44+
ErrorClassifier.DEFAULT);
3945

4046
assertThat(queueVisitor.getExecutorServiceByThreadPoolType(ThreadPoolType.REGULAR))
4147
.isEqualTo(regular);
4248
}
4349

50+
@Test
51+
public void testGetExecutorServiceByThreadPoolType_skyframeGlobs() {
52+
ExecutorService regular = mock(ExecutorService.class);
53+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
54+
ExecutorService cpuHeavy = mock(ExecutorService.class);
55+
56+
MultiExecutorQueueVisitor queueVisitor =
57+
MultiExecutorQueueVisitor.createWithExecutorServices(
58+
regular,
59+
skyframeGlobs,
60+
cpuHeavy,
61+
/* executionPhaseExecutorService= */ null,
62+
ExceptionHandlingMode.KEEP_GOING,
63+
ErrorClassifier.DEFAULT);
64+
65+
assertThat(queueVisitor.getExecutorServiceByThreadPoolType(ThreadPoolType.SKYFRAME_GLOBS))
66+
.isEqualTo(skyframeGlobs);
67+
}
68+
4469
@Test
4570
public void testGetExecutorServiceByThreadPoolType_cpuHeavy() {
4671
ExecutorService regular = mock(ExecutorService.class);
72+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
4773
ExecutorService cpuHeavy = mock(ExecutorService.class);
4874

4975
MultiExecutorQueueVisitor queueVisitor =
5076
MultiExecutorQueueVisitor.createWithExecutorServices(
51-
regular, cpuHeavy, ExceptionHandlingMode.KEEP_GOING, ErrorClassifier.DEFAULT);
77+
regular,
78+
skyframeGlobs,
79+
cpuHeavy,
80+
/* executionPhaseExecutorService= */ null,
81+
ExceptionHandlingMode.KEEP_GOING,
82+
ErrorClassifier.DEFAULT);
5283

5384
assertThat(queueVisitor.getExecutorServiceByThreadPoolType(ThreadPoolType.CPU_HEAVY))
5485
.isEqualTo(cpuHeavy);
@@ -57,43 +88,58 @@ public void testGetExecutorServiceByThreadPoolType_cpuHeavy() {
5788
@Test
5889
public void testShutDownExecutorService_noThrowables() {
5990
ExecutorService regular = mock(ExecutorService.class);
91+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
6092
ExecutorService cpuHeavy = mock(ExecutorService.class);
6193

6294
MultiExecutorQueueVisitor queueVisitor =
6395
MultiExecutorQueueVisitor.createWithExecutorServices(
64-
regular, cpuHeavy, ExceptionHandlingMode.KEEP_GOING, ErrorClassifier.DEFAULT);
65-
queueVisitor.shutdownExecutorService(/*catastrophe=*/ null);
96+
regular,
97+
skyframeGlobs,
98+
cpuHeavy,
99+
/* executionPhaseExecutorService= */ null,
100+
ExceptionHandlingMode.KEEP_GOING,
101+
ErrorClassifier.DEFAULT);
102+
queueVisitor.shutdownExecutorService(/* catastrophe= */ null);
66103

67104
verify(regular).shutdown();
105+
verify(skyframeGlobs).shutdown();
68106
verify(cpuHeavy).shutdown();
69107
}
70108

71109
@Test
72110
public void testShutDownExecutorService_withThrowable() {
73111
ExecutorService regular = mock(ExecutorService.class);
112+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
74113
ExecutorService cpuHeavy = mock(ExecutorService.class);
75114

76115
MultiExecutorQueueVisitor queueVisitor =
77116
MultiExecutorQueueVisitor.createWithExecutorServices(
78-
regular, cpuHeavy, ExceptionHandlingMode.KEEP_GOING, ErrorClassifier.DEFAULT);
117+
regular,
118+
skyframeGlobs,
119+
cpuHeavy,
120+
/* executionPhaseExecutorService= */ null,
121+
ExceptionHandlingMode.KEEP_GOING,
122+
ErrorClassifier.DEFAULT);
79123
RuntimeException toBeThrown = new RuntimeException();
80124

81125
Throwable thrown =
82126
assertThrows(
83127
Throwable.class,
84-
() -> queueVisitor.shutdownExecutorService(/*catastrophe=*/ toBeThrown));
128+
() -> queueVisitor.shutdownExecutorService(/* catastrophe= */ toBeThrown));
85129
assertThat(thrown).isEqualTo(toBeThrown);
86130
}
87131

88132
@Test
89133
public void testGetExecutorServiceByThreadPoolType_executionPhase() {
90134
ExecutorService regular = mock(ExecutorService.class);
135+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
91136
ExecutorService cpuHeavy = mock(ExecutorService.class);
92137
ExecutorService executionPhase = mock(ExecutorService.class);
93138

94139
MultiExecutorQueueVisitor queueVisitor =
95140
MultiExecutorQueueVisitor.createWithExecutorServices(
96141
regular,
142+
skyframeGlobs,
97143
cpuHeavy,
98144
executionPhase,
99145
ExceptionHandlingMode.KEEP_GOING,
@@ -106,11 +152,17 @@ public void testGetExecutorServiceByThreadPoolType_executionPhase() {
106152
@Test
107153
public void testGetExecutorServiceByThreadPoolType_executionPhaseWithoutExecutor_throwsNPE() {
108154
ExecutorService regular = mock(ExecutorService.class);
155+
ExecutorService skyframeGlobs = mock(ExecutorService.class);
109156
ExecutorService cpuHeavy = mock(ExecutorService.class);
110157

111158
MultiExecutorQueueVisitor queueVisitorWithoutExecutionPhasePool =
112159
MultiExecutorQueueVisitor.createWithExecutorServices(
113-
regular, cpuHeavy, ExceptionHandlingMode.KEEP_GOING, ErrorClassifier.DEFAULT);
160+
regular,
161+
skyframeGlobs,
162+
cpuHeavy,
163+
/* executionPhaseExecutorService= */ null,
164+
ExceptionHandlingMode.KEEP_GOING,
165+
ErrorClassifier.DEFAULT);
114166

115167
assertThrows(
116168
NullPointerException.class,

0 commit comments

Comments
 (0)