1919import com .google .api .gax .bundling .FlowController ;
2020import com .google .api .gax .core .RetrySettings ;
2121import com .google .api .gax .grpc .BundlingSettings ;
22+ import com .google .api .gax .grpc .ExecutorProvider ;
23+ import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
2224import com .google .auth .Credentials ;
2325import com .google .auth .oauth2 .GoogleCredentials ;
2426import com .google .common .base .Optional ;
2830import com .google .common .util .concurrent .Futures ;
2931import com .google .common .util .concurrent .ListenableFuture ;
3032import com .google .common .util .concurrent .SettableFuture ;
31- import com .google .common .util .concurrent .ThreadFactoryBuilder ;
3233import com .google .pubsub .v1 .PublishRequest ;
3334import com .google .pubsub .v1 .PublishResponse ;
3435import com .google .pubsub .v1 .PublisherGrpc ;
4344import io .grpc .netty .NegotiationType ;
4445import io .grpc .netty .NettyChannelBuilder ;
4546import java .io .IOException ;
47+ import java .util .ArrayList ;
4648import java .util .Iterator ;
4749import java .util .LinkedList ;
4850import java .util .List ;
49- import java .util .concurrent .Executors ;
5051import java .util .concurrent .ScheduledExecutorService ;
5152import java .util .concurrent .ScheduledFuture ;
5253import java .util .concurrent .ThreadLocalRandom ;
109110 */
110111public class Publisher {
111112 /** The maximum number of messages in one request. Defined by the API. */
112- public static long getApiMaxBundleMessages () {
113+ public static long getApiMaxRequestElementCount () {
113114 return 1000L ;
114115 }
115116
116117 /** The maximum size of one request. Defined by the API. */
117- public static long getApiMaxBundleBytes () {
118+ public static long getApiMaxRequestBytes () {
118119 return 10L * 1000L * 1000L ; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
119120 }
120121
121- private static final int DEFAULT_MIN_THREAD_POOL_SIZE = 5 ;
122-
123122 private static final Logger logger = LoggerFactory .getLogger (Publisher .class );
124123
125124 private final String topic ;
@@ -143,6 +142,7 @@ public static long getApiMaxBundleBytes() {
143142
144143 private final ScheduledExecutorService executor ;
145144 private final AtomicBoolean shutdown ;
145+ private final List <AutoCloseable > closeables = new ArrayList <>();
146146 private final MessagesWaiter messagesWaiter ;
147147 private ScheduledFuture <?> currentAlarmFuture ;
148148
@@ -160,15 +160,16 @@ private Publisher(Builder builder) throws IOException {
160160 messagesBundleLock = new ReentrantLock ();
161161 activeAlarm = new AtomicBoolean (false );
162162 int numCores = Math .max (1 , Runtime .getRuntime ().availableProcessors ());
163- executor =
164- builder .executor .isPresent ()
165- ? builder .executor .get ()
166- : Executors .newScheduledThreadPool (
167- numCores * DEFAULT_MIN_THREAD_POOL_SIZE ,
168- new ThreadFactoryBuilder ()
169- .setDaemon (true )
170- .setNameFormat ("cloud-pubsub-publisher-thread-%d" )
171- .build ());
163+ executor = builder .executorProvider .getExecutor ();
164+ if (builder .executorProvider .shouldAutoClose ()) {
165+ closeables .add (
166+ new AutoCloseable () {
167+ @ Override
168+ public void close () throws IOException {
169+ executor .shutdown ();
170+ }
171+ });
172+ }
172173 channels = new Channel [numCores ];
173174 channelIndex = new AtomicRoundRobin (channels .length );
174175 for (int i = 0 ; i < numCores ; i ++) {
@@ -193,7 +194,7 @@ private Publisher(Builder builder) throws IOException {
193194 messagesWaiter = new MessagesWaiter ();
194195 }
195196
196- /** Topic to which the publisher publishes to. */
197+ /** Topic which the publisher publishes to. */
197198 public String getTopic () {
198199 return topic ;
199200 }
@@ -474,7 +475,7 @@ public boolean failOnFlowControlLimits() {
474475 * should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
475476 * pending messages are lost.
476477 */
477- public void shutdown () {
478+ public void shutdown () throws Exception {
478479 if (shutdown .getAndSet (true )) {
479480 throw new IllegalStateException ("Cannot shut down a publisher already shut-down." );
480481 }
@@ -483,6 +484,9 @@ public void shutdown() {
483484 }
484485 publishAllOutstanding ();
485486 messagesWaiter .waitNoMessages ();
487+ for (AutoCloseable closeable : closeables ) {
488+ closeable .close ();
489+ }
486490 }
487491
488492 private boolean hasBundlingBytes () {
@@ -544,6 +548,12 @@ public static final class Builder {
544548 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
545549 .build ();
546550
551+ private static final int THREADS_PER_CPU = 5 ;
552+ static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
553+ InstantiatingExecutorProvider .newBuilder ()
554+ .setExecutorThreadCount (THREADS_PER_CPU * Runtime .getRuntime ().availableProcessors ())
555+ .build ();
556+
547557 String topic ;
548558
549559 // Bundling options
@@ -560,7 +570,7 @@ public static final class Builder {
560570 Optional <ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>> channelBuilder =
561571 Optional .absent ();
562572
563- Optional < ScheduledExecutorService > executor = Optional . absent () ;
573+ ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER ;
564574
565575 /** Constructs a new {@link Builder} using the given topic. */
566576 public static Builder newBuilder (TopicName topic ) {
@@ -650,8 +660,8 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
650660 }
651661
652662 /** Gives the ability to set a custom executor to be used by the library. */
653- public Builder setExecutor ( ScheduledExecutorService executor ) {
654- this .executor = Optional . of ( Preconditions .checkNotNull (executor ) );
663+ public Builder setExecutorProvider ( ExecutorProvider executorProvider ) {
664+ this .executorProvider = Preconditions .checkNotNull (executorProvider );
655665 return this ;
656666 }
657667
0 commit comments