Skip to content

Commit 581c81a

Browse files
coeuvrecopybara-github
authored andcommitted
Remote: Async upload (Part 9)
Update remote module to share a thread pool (with max size equals to --jobs) for gRPC and background uploads. Part of #13655. Closes #13655. PiperOrigin-RevId: 395157789
1 parent 9cb5936 commit 581c81a

File tree

11 files changed

+124
-53
lines changed

11 files changed

+124
-53
lines changed

src/main/java/com/google/devtools/build/lib/authandtls/GoogleAuthUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.IOException;
4242
import java.io.InputStream;
4343
import java.util.List;
44+
import java.util.concurrent.Executor;
4445
import java.util.concurrent.TimeUnit;
4546
import javax.annotation.Nullable;
4647

@@ -53,6 +54,7 @@ public final class GoogleAuthUtils {
5354
* @throws IOException in case the channel can't be constructed.
5455
*/
5556
public static ManagedChannel newChannel(
57+
@Nullable Executor executor,
5658
String target,
5759
String proxy,
5860
AuthAndTLSOptions options,
@@ -71,6 +73,7 @@ public static ManagedChannel newChannel(
7173
try {
7274
NettyChannelBuilder builder =
7375
newNettyChannelBuilder(targetUrl, proxy)
76+
.executor(executor)
7477
.negotiationType(
7578
isTlsEnabled(target) ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
7679
if (options.grpcKeepaliveTime != null) {

src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ static Metadata makeGrpcMetadata(BackendConfig config) {
105105
@VisibleForTesting
106106
protected ManagedChannel newGrpcChannel(BackendConfig config) throws IOException {
107107
return GoogleAuthUtils.newChannel(
108+
/*executor=*/ null,
108109
config.besBackend(),
109110
config.besProxy(),
110111
config.authAndTLSOptions(),

src/main/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ java_library(
4242
":ExecutionStatusException",
4343
":ReferenceCountedChannel",
4444
":Retrier",
45+
"//src/main/java/com/google/devtools/build/lib:build-request-options",
4546
"//src/main/java/com/google/devtools/build/lib:runtime",
4647
"//src/main/java/com/google/devtools/build/lib/actions",
4748
"//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",

src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.google.devtools.build.lib.remote;
1515

1616
import static com.google.common.base.Preconditions.checkNotNull;
17+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
1718

1819
import com.google.common.base.Preconditions;
1920
import com.google.common.collect.ImmutableSet;
@@ -32,30 +33,34 @@
3233
import com.google.devtools.build.lib.remote.util.DigestUtil;
3334
import com.google.devtools.build.lib.runtime.CommandEnvironment;
3435
import com.google.devtools.build.lib.vfs.Path;
36+
import java.util.concurrent.Executor;
3537
import javax.annotation.Nullable;
3638

3739
/** Provides a remote execution context. */
3840
final class RemoteActionContextProvider {
3941

42+
private final Executor executor;
4043
private final CommandEnvironment env;
41-
@Nullable private final RemoteCache cache;
42-
@Nullable private final RemoteExecutionClient executor;
44+
@Nullable private final RemoteCache remoteCache;
45+
@Nullable private final RemoteExecutionClient remoteExecutor;
4346
@Nullable private final ListeningScheduledExecutorService retryScheduler;
4447
private final DigestUtil digestUtil;
4548
@Nullable private final Path logDir;
4649
private ImmutableSet<ActionInput> filesToDownload = ImmutableSet.of();
4750
private RemoteExecutionService remoteExecutionService;
4851

4952
private RemoteActionContextProvider(
53+
Executor executor,
5054
CommandEnvironment env,
51-
@Nullable RemoteCache cache,
52-
@Nullable RemoteExecutionClient executor,
55+
@Nullable RemoteCache remoteCache,
56+
@Nullable RemoteExecutionClient remoteExecutor,
5357
@Nullable ListeningScheduledExecutorService retryScheduler,
5458
DigestUtil digestUtil,
5559
@Nullable Path logDir) {
56-
this.env = Preconditions.checkNotNull(env, "env");
57-
this.cache = cache;
5860
this.executor = executor;
61+
this.env = Preconditions.checkNotNull(env, "env");
62+
this.remoteCache = remoteCache;
63+
this.remoteExecutor = remoteExecutor;
5964
this.retryScheduler = retryScheduler;
6065
this.digestUtil = digestUtil;
6166
this.logDir = logDir;
@@ -66,27 +71,41 @@ public static RemoteActionContextProvider createForPlaceholder(
6671
ListeningScheduledExecutorService retryScheduler,
6772
DigestUtil digestUtil) {
6873
return new RemoteActionContextProvider(
69-
env, /*cache=*/ null, /*executor=*/ null, retryScheduler, digestUtil, /*logDir=*/ null);
74+
directExecutor(),
75+
env,
76+
/*remoteCache=*/ null,
77+
/*remoteExecutor=*/ null,
78+
retryScheduler,
79+
digestUtil,
80+
/*logDir=*/ null);
7081
}
7182

7283
public static RemoteActionContextProvider createForRemoteCaching(
84+
Executor executor,
7385
CommandEnvironment env,
74-
RemoteCache cache,
86+
RemoteCache remoteCache,
7587
ListeningScheduledExecutorService retryScheduler,
7688
DigestUtil digestUtil) {
7789
return new RemoteActionContextProvider(
78-
env, cache, /*executor=*/ null, retryScheduler, digestUtil, /*logDir=*/ null);
90+
executor,
91+
env,
92+
remoteCache,
93+
/*remoteExecutor=*/ null,
94+
retryScheduler,
95+
digestUtil,
96+
/*logDir=*/ null);
7997
}
8098

8199
public static RemoteActionContextProvider createForRemoteExecution(
100+
Executor executor,
82101
CommandEnvironment env,
83-
RemoteExecutionCache cache,
84-
RemoteExecutionClient executor,
102+
RemoteExecutionCache remoteCache,
103+
RemoteExecutionClient remoteExecutor,
85104
ListeningScheduledExecutorService retryScheduler,
86105
DigestUtil digestUtil,
87106
Path logDir) {
88107
return new RemoteActionContextProvider(
89-
env, cache, executor, retryScheduler, digestUtil, logDir);
108+
executor, env, remoteCache, remoteExecutor, retryScheduler, digestUtil, logDir);
90109
}
91110

92111
private RemotePathResolver createRemotePathResolver() {
@@ -120,6 +139,7 @@ private RemoteExecutionService getRemoteExecutionService() {
120139
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class)).verboseFailures;
121140
remoteExecutionService =
122141
new RemoteExecutionService(
142+
executor,
123143
env.getReporter(),
124144
verboseFailures,
125145
env.getExecRoot(),
@@ -128,8 +148,8 @@ private RemoteExecutionService getRemoteExecutionService() {
128148
env.getCommandId().toString(),
129149
digestUtil,
130150
checkNotNull(env.getOptions().getOptions(RemoteOptions.class)),
131-
cache,
132-
executor,
151+
remoteCache,
152+
remoteExecutor,
133153
filesToDownload,
134154
captureCorruptedOutputsDir);
135155
env.getEventBus().register(remoteExecutionService);
@@ -179,11 +199,11 @@ public void registerSpawnCache(ModuleActionContextRegistry.Builder registryBuild
179199

180200
/** Returns the remote cache. */
181201
RemoteCache getRemoteCache() {
182-
return cache;
202+
return remoteCache;
183203
}
184204

185205
RemoteExecutionClient getRemoteExecutionClient() {
186-
return executor;
206+
return remoteExecutor;
187207
}
188208

189209
void setFilesToDownload(ImmutableSet<ActionInput> topLevelOutputs) {
@@ -194,11 +214,11 @@ public void afterCommand() {
194214
if (remoteExecutionService != null) {
195215
remoteExecutionService.shutdown();
196216
} else {
197-
if (cache != null) {
198-
cache.release();
217+
if (remoteCache != null) {
218+
remoteCache.release();
199219
}
200-
if (executor != null) {
201-
executor.close();
220+
if (remoteExecutor != null) {
221+
remoteExecutor.close();
202222
}
203223
}
204224
}

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import com.google.protobuf.Message;
117117
import io.grpc.Status.Code;
118118
import io.reactivex.rxjava3.annotations.NonNull;
119+
import io.reactivex.rxjava3.core.Scheduler;
119120
import io.reactivex.rxjava3.core.Single;
120121
import io.reactivex.rxjava3.core.SingleObserver;
121122
import io.reactivex.rxjava3.disposables.Disposable;
@@ -132,6 +133,7 @@
132133
import java.util.Objects;
133134
import java.util.SortedMap;
134135
import java.util.TreeSet;
136+
import java.util.concurrent.Executor;
135137
import java.util.concurrent.atomic.AtomicBoolean;
136138
import javax.annotation.Nullable;
137139

@@ -153,10 +155,13 @@ public class RemoteExecutionService {
153155
private final ImmutableSet<PathFragment> filesToDownload;
154156
@Nullable private final Path captureCorruptedOutputsDir;
155157

158+
private final Scheduler scheduler;
159+
156160
private final AtomicBoolean shutdown = new AtomicBoolean(false);
157161
private final AtomicBoolean buildInterrupted = new AtomicBoolean(false);
158162

159163
public RemoteExecutionService(
164+
Executor executor,
160165
Reporter reporter,
161166
boolean verboseFailures,
162167
Path execRoot,
@@ -185,6 +190,8 @@ public RemoteExecutionService(
185190
}
186191
this.filesToDownload = filesToDownloadBuilder.build();
187192
this.captureCorruptedOutputsDir = captureCorruptedOutputsDir;
193+
194+
this.scheduler = Schedulers.from(executor, /*interruptibleWorker=*/ true);
188195
}
189196

190197
static Command buildCommand(
@@ -1065,7 +1072,7 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
10651072
remoteCache ->
10661073
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
10671074
RemoteCache::release)
1068-
.subscribeOn(Schedulers.io())
1075+
.subscribeOn(scheduler)
10691076
.subscribe(
10701077
new SingleObserver<ActionResult>() {
10711078
@Override

src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.google.devtools.build.lib.remote;
1616

17+
import static java.util.concurrent.TimeUnit.SECONDS;
18+
1719
import build.bazel.remote.execution.v2.DigestFunction;
1820
import build.bazel.remote.execution.v2.ServerCapabilities;
1921
import com.google.auth.Credentials;
@@ -27,6 +29,7 @@
2729
import com.google.common.flogger.GoogleLogger;
2830
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
2931
import com.google.common.util.concurrent.MoreExecutors;
32+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3033
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
3134
import com.google.devtools.build.lib.actions.ActionGraph;
3235
import com.google.devtools.build.lib.actions.ActionInput;
@@ -50,6 +53,7 @@
5053
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
5154
import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader;
5255
import com.google.devtools.build.lib.buildtool.BuildRequest;
56+
import com.google.devtools.build.lib.buildtool.BuildRequestOptions;
5357
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
5458
import com.google.devtools.build.lib.events.Event;
5559
import com.google.devtools.build.lib.events.Reporter;
@@ -102,7 +106,11 @@
102106
import java.util.Map;
103107
import java.util.Optional;
104108
import java.util.Set;
109+
import java.util.concurrent.ExecutorService;
105110
import java.util.concurrent.Executors;
111+
import java.util.concurrent.LinkedBlockingQueue;
112+
import java.util.concurrent.ThreadFactory;
113+
import java.util.concurrent.ThreadPoolExecutor;
106114

107115
/** RemoteModule provides distributed cache and remote execution for Bazel. */
108116
public final class RemoteModule extends BlazeModule {
@@ -114,6 +122,8 @@ public final class RemoteModule extends BlazeModule {
114122
private final ListeningScheduledExecutorService retryScheduler =
115123
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
116124

125+
private ExecutorService executorService;
126+
117127
private RemoteActionContextProvider actionContextProvider;
118128
private RemoteActionInputFetcher actionInputFetcher;
119129
private RemoteOutputsMode remoteOutputsMode;
@@ -129,7 +139,11 @@ public ManagedChannel newChannel(
129139
List<ClientInterceptor> interceptors)
130140
throws IOException {
131141
return GoogleAuthUtils.newChannel(
132-
target, proxy, options, interceptors.isEmpty() ? null : interceptors);
142+
executorService,
143+
target,
144+
proxy,
145+
options,
146+
interceptors.isEmpty() ? null : interceptors);
133147
}
134148
};
135149

@@ -224,7 +238,7 @@ private void initHttpAndDiskCache(
224238
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
225239
actionContextProvider =
226240
RemoteActionContextProvider.createForRemoteCaching(
227-
env, remoteCache, /* retryScheduler= */ null, digestUtil);
241+
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
228242
}
229243

230244
@Override
@@ -293,6 +307,24 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
293307
env.getOutputBase().getRelative(env.getRuntime().getProductName() + "-remote-logs");
294308
cleanAndCreateRemoteLogsDir(logDir);
295309

310+
BuildRequestOptions buildRequestOptions =
311+
env.getOptions().getOptions(BuildRequestOptions.class);
312+
313+
int jobs = 0;
314+
if (buildRequestOptions != null) {
315+
jobs = buildRequestOptions.jobs;
316+
}
317+
318+
ThreadFactory threadFactory =
319+
new ThreadFactoryBuilder().setNameFormat("remote-executor-%d").build();
320+
if (jobs != 0) {
321+
executorService =
322+
new ThreadPoolExecutor(
323+
/*corePoolSize=*/ 0, jobs, 60L, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
324+
} else {
325+
executorService = Executors.newCachedThreadPool(threadFactory);
326+
}
327+
296328
if ((enableHttpCache || enableDiskCache) && !enableGrpcCache) {
297329
initHttpAndDiskCache(env, authAndTlsOptions, remoteOptions, digestUtil);
298330
return;
@@ -547,7 +579,13 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
547579
new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
548580
actionContextProvider =
549581
RemoteActionContextProvider.createForRemoteExecution(
550-
env, remoteCache, remoteExecutor, retryScheduler, digestUtil, logDir);
582+
executorService,
583+
env,
584+
remoteCache,
585+
remoteExecutor,
586+
retryScheduler,
587+
digestUtil,
588+
logDir);
551589
repositoryRemoteExecutorFactoryDelegate.init(
552590
new RemoteRepositoryRemoteExecutorFactory(
553591
remoteCache,
@@ -578,7 +616,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
578616
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
579617
actionContextProvider =
580618
RemoteActionContextProvider.createForRemoteCaching(
581-
env, remoteCache, retryScheduler, digestUtil);
619+
executorService, env, remoteCache, retryScheduler, digestUtil);
582620
}
583621

584622
if (enableRemoteDownloader) {
@@ -773,6 +811,7 @@ public void afterCommand() throws AbruptExitException {
773811
logger.atWarning().withCause(e).log(failureMessage);
774812
}
775813

814+
executorService = null;
776815
buildEventArtifactUploaderFactoryDelegate.reset();
777816
repositoryRemoteExecutorFactoryDelegate.reset();
778817
remoteDownloaderSupplier.set(null);

0 commit comments

Comments
 (0)