Skip to content

Commit 4391eb2

Browse files
committed
Merge branch 'pubsub-hp' into hide-stats
2 parents 2499922 + 51a9a80 commit 4391eb2

5 files changed

Lines changed: 106 additions & 54 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,22 @@
1919
import com.google.common.util.concurrent.ListenableFuture;
2020
import com.google.pubsub.v1.PubsubMessage;
2121

22-
/** Users of the {@link Subscriber} must implement this interface to receive messages. */
22+
/** This interface can be implemented by users of {@link Subscriber} to receive messages. */
2323
public interface MessageReceiver {
24-
enum AckReply {
25-
/** To be used for acking a message. */
24+
/** A reply to a message, to be sent back to the service. */
25+
enum AckReply {
26+
/**
27+
* Acknowledges that the message has been successfully processed. The service will not send the
28+
* message again.
29+
*/
2630
ACK,
27-
/** To be used for nacking a message. */
31+
/**
32+
* Signals that the message has not been successfully processed. The service will resend the
33+
* message.
34+
*/
2835
NACK
2936
}
37+
3038
/**
3139
* Called when a message is received by the subscriber.
3240
*

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.api.gax.core.RetrySettings;
2121
import com.google.api.gax.grpc.BundlingSettings;
22+
import com.google.api.gax.grpc.ExecutorProvider;
23+
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2224
import com.google.auth.Credentials;
2325
import com.google.auth.oauth2.GoogleCredentials;
2426
import com.google.common.base.Optional;
@@ -28,7 +30,6 @@
2830
import com.google.common.util.concurrent.Futures;
2931
import com.google.common.util.concurrent.ListenableFuture;
3032
import com.google.common.util.concurrent.SettableFuture;
31-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3233
import com.google.pubsub.v1.PublishRequest;
3334
import com.google.pubsub.v1.PublishResponse;
3435
import com.google.pubsub.v1.PublisherGrpc;
@@ -43,10 +44,10 @@
4344
import io.grpc.netty.NegotiationType;
4445
import io.grpc.netty.NettyChannelBuilder;
4546
import java.io.IOException;
47+
import java.util.ArrayList;
4648
import java.util.Iterator;
4749
import java.util.LinkedList;
4850
import java.util.List;
49-
import java.util.concurrent.Executors;
5051
import java.util.concurrent.ScheduledExecutorService;
5152
import java.util.concurrent.ScheduledFuture;
5253
import java.util.concurrent.ThreadLocalRandom;
@@ -109,17 +110,15 @@
109110
*/
110111
public class Publisher {
111112
/** The maximum number of messages in one request. Defined by the API. */
112-
public static long getApiMaxBundleMessages() {
113+
public static long getApiMaxRequestElementCount() {
113114
return 1000L;
114115
}
115116

116117
/** The maximum size of one request. Defined by the API. */
117-
public static long getApiMaxBundleBytes() {
118+
public static long getApiMaxRequestBytes() {
118119
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
119120
}
120121

121-
private static final int DEFAULT_MIN_THREAD_POOL_SIZE = 5;
122-
123122
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
124123

125124
private final String topic;
@@ -143,6 +142,7 @@ public static long getApiMaxBundleBytes() {
143142

144143
private final ScheduledExecutorService executor;
145144
private final AtomicBoolean shutdown;
145+
private final List<AutoCloseable> closeables = new ArrayList<>();
146146
private final MessagesWaiter messagesWaiter;
147147
private ScheduledFuture<?> currentAlarmFuture;
148148

@@ -160,15 +160,16 @@ private Publisher(Builder builder) throws IOException {
160160
messagesBundleLock = new ReentrantLock();
161161
activeAlarm = new AtomicBoolean(false);
162162
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
163-
executor =
164-
builder.executor.isPresent()
165-
? builder.executor.get()
166-
: Executors.newScheduledThreadPool(
167-
numCores * DEFAULT_MIN_THREAD_POOL_SIZE,
168-
new ThreadFactoryBuilder()
169-
.setDaemon(true)
170-
.setNameFormat("cloud-pubsub-publisher-thread-%d")
171-
.build());
163+
executor = builder.executorProvider.getExecutor();
164+
if (builder.executorProvider.shouldAutoClose()) {
165+
closeables.add(
166+
new AutoCloseable() {
167+
@Override
168+
public void close() throws IOException {
169+
executor.shutdown();
170+
}
171+
});
172+
}
172173
channels = new Channel[numCores];
173174
channelIndex = new AtomicRoundRobin(channels.length);
174175
for (int i = 0; i < numCores; i++) {
@@ -193,7 +194,7 @@ private Publisher(Builder builder) throws IOException {
193194
messagesWaiter = new MessagesWaiter();
194195
}
195196

196-
/** Topic to which the publisher publishes to. */
197+
/** Topic which the publisher publishes to. */
197198
public String getTopic() {
198199
return topic;
199200
}
@@ -474,7 +475,7 @@ public boolean failOnFlowControlLimits() {
474475
* should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
475476
* pending messages are lost.
476477
*/
477-
public void shutdown() {
478+
public void shutdown() throws Exception {
478479
if (shutdown.getAndSet(true)) {
479480
throw new IllegalStateException("Cannot shut down a publisher already shut-down.");
480481
}
@@ -483,6 +484,9 @@ public void shutdown() {
483484
}
484485
publishAllOutstanding();
485486
messagesWaiter.waitNoMessages();
487+
for (AutoCloseable closeable : closeables) {
488+
closeable.close();
489+
}
486490
}
487491

488492
private boolean hasBundlingBytes() {
@@ -544,6 +548,12 @@ public static final class Builder {
544548
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
545549
.build();
546550

551+
private static final int THREADS_PER_CPU = 5;
552+
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
553+
InstantiatingExecutorProvider.newBuilder()
554+
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
555+
.build();
556+
547557
String topic;
548558

549559
// Bundling options
@@ -560,7 +570,7 @@ public static final class Builder {
560570
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
561571
Optional.absent();
562572

563-
Optional<ScheduledExecutorService> executor = Optional.absent();
573+
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
564574

565575
/** Constructs a new {@link Builder} using the given topic. */
566576
public static Builder newBuilder(TopicName topic) {
@@ -650,8 +660,8 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
650660
}
651661

652662
/** Gives the ability to set a custom executor to be used by the library. */
653-
public Builder setExecutor(ScheduledExecutorService executor) {
654-
this.executor = Optional.of(Preconditions.checkNotNull(executor));
663+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
664+
this.executorProvider = Preconditions.checkNotNull(executorProvider);
655665
return this;
656666
}
657667

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.gax.bundling.FlowController;
20+
import com.google.api.gax.grpc.ExecutorProvider;
21+
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2022
import com.google.api.stats.Distribution;
2123
import com.google.auth.Credentials;
2224
import com.google.auth.oauth2.GoogleCredentials;
@@ -28,7 +30,6 @@
2830
import com.google.common.primitives.Ints;
2931
import com.google.common.util.concurrent.AbstractService;
3032
import com.google.common.util.concurrent.Service;
31-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3233
import com.google.pubsub.v1.SubscriptionName;
3334
import io.grpc.ManagedChannelBuilder;
3435
import io.grpc.Status;
@@ -40,7 +41,6 @@
4041
import java.util.ArrayList;
4142
import java.util.List;
4243
import java.util.concurrent.CountDownLatch;
43-
import java.util.concurrent.Executors;
4444
import java.util.concurrent.ScheduledExecutorService;
4545
import java.util.concurrent.ScheduledFuture;
4646
import java.util.concurrent.TimeUnit;
@@ -131,6 +131,7 @@ public class Subscriber extends AbstractService {
131131
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
132132
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
133133
private final Clock clock;
134+
private final List<AutoCloseable> closeables = new ArrayList<>();
134135
private ScheduledFuture<?> ackDeadlineUpdater;
135136
private int streamAckDeadlineSeconds;
136137

@@ -147,16 +148,16 @@ private Subscriber(Builder builder) throws IOException {
147148

148149
flowController = new FlowController(builder.flowControlSettings, false);
149150

150-
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
151-
executor =
152-
builder.executor.isPresent()
153-
? builder.executor.get()
154-
: Executors.newScheduledThreadPool(
155-
numChannels * THREADS_PER_CHANNEL,
156-
new ThreadFactoryBuilder()
157-
.setDaemon(true)
158-
.setNameFormat("cloud-pubsub-subscriber-thread-%d")
159-
.build());
151+
executor = builder.executorProvider.getExecutor();
152+
if (builder.executorProvider.shouldAutoClose()) {
153+
closeables.add(
154+
new AutoCloseable() {
155+
@Override
156+
public void close() throws IOException {
157+
executor.shutdown();
158+
}
159+
});
160+
}
160161

161162
channelBuilder =
162163
builder.channelBuilder.isPresent()
@@ -176,6 +177,7 @@ private Subscriber(Builder builder) throws IOException {
176177
: GoogleCredentials.getApplicationDefault()
177178
.createScoped(SubscriberSettings.getDefaultServiceScopes());
178179

180+
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
179181
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
180182
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
181183
}
@@ -191,7 +193,14 @@ protected void doStart() {
191193
protected void doStop() {
192194
stopAllStreamingConnections();
193195
stopAllPollingConnections();
194-
notifyStopped();
196+
try {
197+
for (AutoCloseable closeable : closeables) {
198+
closeable.close();
199+
}
200+
notifyStopped();
201+
} catch (Exception e) {
202+
notifyFailed(e);
203+
}
195204
}
196205

197206
private void startStreamingConnections() {
@@ -356,14 +365,17 @@ public void run() {
356365
}
357366
}
358367

368+
/** Subscription which the subscriber is subscribed to. */
359369
public String getSubscription() {
360370
return subscription;
361371
}
362372

373+
/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
363374
public Duration getAckExpirationPadding() {
364375
return ackExpirationPadding;
365376
}
366377

378+
/** The flow control settings the Subscriber is configured with. */
367379
public FlowController.Settings getFlowControlSettings() {
368380
return flowControlSettings;
369381
}
@@ -374,6 +386,14 @@ public static final class Builder {
374386
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
375387
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
376388

389+
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
390+
InstantiatingExecutorProvider.newBuilder()
391+
.setExecutorThreadCount(
392+
THREADS_PER_CHANNEL
393+
* CHANNELS_PER_CORE
394+
* Runtime.getRuntime().availableProcessors())
395+
.build();
396+
377397
String subscription;
378398
Optional<Credentials> credentials = Optional.absent();
379399
MessageReceiver receiver;
@@ -382,7 +402,7 @@ public static final class Builder {
382402

383403
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
384404

385-
Optional<ScheduledExecutorService> executor = Optional.absent();
405+
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
386406
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
387407
Optional.absent();
388408
Optional<Clock> clock = Optional.absent();
@@ -454,8 +474,8 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
454474
}
455475

456476
/** Gives the ability to set a custom executor. */
457-
public Builder setExecutor(ScheduledExecutorService executor) {
458-
this.executor = Optional.of(executor);
477+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
478+
this.executorProvider = Preconditions.checkNotNull(executorProvider);
459479
return this;
460480
}
461481

0 commit comments

Comments
 (0)