1919import com .google .api .gax .core .RetrySettings ;
2020import com .google .api .gax .grpc .ApiCallSettings ;
2121import com .google .api .gax .grpc .ApiException ;
22+ import com .google .auth .oauth2 .GoogleCredentials ;
23+ import com .google .cloud .AuthCredentials ;
2224import com .google .cloud .RetryParams ;
2325import com .google .cloud .pubsub .PubSubException ;
2426import com .google .cloud .pubsub .PubSubOptions ;
2729import com .google .cloud .pubsub .spi .v1 .SubscriberApi ;
2830import com .google .cloud .pubsub .spi .v1 .SubscriberSettings ;
2931import com .google .common .base .Function ;
32+ import com .google .common .collect .Sets ;
3033import com .google .common .util .concurrent .Futures ;
3134import com .google .common .util .concurrent .ListenableFuture ;
35+ import com .google .common .util .concurrent .MoreExecutors ;
3236import com .google .protobuf .Empty ;
3337import com .google .pubsub .v1 .AcknowledgeRequest ;
3438import com .google .pubsub .v1 .DeleteSubscriptionRequest ;
5054import com .google .pubsub .v1 .Subscription ;
5155import com .google .pubsub .v1 .Topic ;
5256
57+ import io .grpc .ManagedChannel ;
58+ import io .grpc .Status .Code ;
59+ import io .grpc .netty .NegotiationType ;
60+ import io .grpc .netty .NettyChannelBuilder ;
61+
5362import org .joda .time .Duration ;
5463
5564import java .io .IOException ;
5665import java .util .Set ;
5766import java .util .concurrent .Future ;
58-
59- import autovalue .shaded .com .google .common .common .collect .Sets ;
60- import io .grpc .Status .Code ;
67+ import java .util .concurrent .ScheduledExecutorService ;
68+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
6169
6270public class DefaultPubSubRpc implements PubSubRpc {
6371
6472 private final PublisherApi publisherApi ;
6573 private final SubscriberApi subscriberApi ;
74+ private final ScheduledExecutorService executor =
75+ MoreExecutors .getExitingScheduledExecutorService (new ScheduledThreadPoolExecutor (8 ));
6676
6777 public DefaultPubSubRpc (PubSubOptions options ) throws IOException {
6878 try {
69- // Provide (and use a common thread-pool).
70- // This depends on https://github.com/googleapis/gax-java/issues/73
71- PublisherSettings .Builder pbuilder =
72- PublisherSettings .defaultBuilder ()
73- .provideChannelWith (options .authCredentials ().credentials ())
74- .applyToAllApiMethods (apiCallSettings (options ));
75- publisherApi = PublisherApi .create (pbuilder .build ());
76- SubscriberSettings .Builder sBuilder =
77- SubscriberSettings .defaultBuilder ()
78- .provideChannelWith (options .authCredentials ().credentials ())
79- .applyToAllApiMethods (apiCallSettings (options ));
80- subscriberApi = SubscriberApi .create (sBuilder .build ());
79+ PublisherSettings .Builder pubBuilder =
80+ PublisherSettings .defaultBuilder ().provideExecutorWith (executor , false );
81+ SubscriberSettings .Builder subBuilder =
82+ SubscriberSettings .defaultBuilder ().provideExecutorWith (executor , false );
83+ // todo(mziccard): PublisherSettings should support null/absent credentials for testing
84+ if (options .host ().contains ("localhost" )
85+ || options .authCredentials ().equals (AuthCredentials .noAuth ())) {
86+ ManagedChannel channel = NettyChannelBuilder .forTarget (options .host ())
87+ .negotiationType (NegotiationType .PLAINTEXT )
88+ .build ();
89+ pubBuilder .provideChannelWith (channel , true );
90+ subBuilder .provideChannelWith (channel , true );
91+ } else {
92+ GoogleCredentials credentials = options .authCredentials ().credentials ();
93+ pubBuilder .provideChannelWith (
94+ credentials .createScoped (PublisherSettings .DEFAULT_SERVICE_SCOPES ));
95+ subBuilder .provideChannelWith (
96+ credentials .createScoped (SubscriberSettings .DEFAULT_SERVICE_SCOPES ));
97+ }
98+ pubBuilder .applyToAllApiMethods (apiCallSettings (options ));
99+ subBuilder .applyToAllApiMethods (apiCallSettings (options ));
100+ publisherApi = PublisherApi .create (pubBuilder .build ());
101+ subscriberApi = SubscriberApi .create (subBuilder .build ());
81102 } catch (Exception ex ) {
82103 throw new IOException (ex );
83104 }
84105 }
85106
107+ private static long translateTimeout (long timeout ) {
108+ if (timeout < 0 ) {
109+ return 20000 ;
110+ } else if (timeout == 0 ) {
111+ return Long .MAX_VALUE ;
112+ }
113+ return timeout ;
114+ }
115+
86116 private static ApiCallSettings .Builder apiCallSettings (PubSubOptions options ) {
87117 // TODO: specify timeout these settings:
88118 // retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
89119 RetryParams retryParams = options .retryParams ();
120+ long connectTimeout = translateTimeout (options .connectTimeout ());
121+ long readTimeout = translateTimeout (options .readTimeout ());
122+ long maxTimeout = connectTimeout == Long .MAX_VALUE || readTimeout == Long .MAX_VALUE
123+ ? Long .MAX_VALUE : connectTimeout + readTimeout ;
90124 final RetrySettings .Builder builder = RetrySettings .newBuilder ()
91125 .setTotalTimeout (Duration .millis (retryParams .totalRetryPeriodMillis ()))
92- .setInitialRpcTimeout (Duration .millis (options . connectTimeout () ))
126+ .setInitialRpcTimeout (Duration .millis (connectTimeout ))
93127 .setRpcTimeoutMultiplier (1.5 )
94- .setMaxRpcTimeout (Duration .millis (options . connectTimeout () + options . readTimeout () ))
128+ .setMaxRpcTimeout (Duration .millis (maxTimeout ))
95129 .setInitialRetryDelay (Duration .millis (retryParams .initialRetryDelayMillis ()))
96130 .setRetryDelayMultiplier (retryParams .retryDelayBackoffFactor ())
97131 .setMaxRetryDelay (Duration .millis (retryParams .maxRetryDelayMillis ()));
@@ -117,7 +151,7 @@ public V apply(ApiException exception) {
117151
118152 @ Override
119153 public Future <Topic > create (Topic topic ) {
120- // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
154+ // TODO: it would be nice if we can get the idempotent information from the ApiCallSettings
121155 // or from the exception
122156 return translate (publisherApi .createTopicCallable ().futureCall (topic ), true );
123157 }
@@ -149,7 +183,6 @@ public Future<ListTopicSubscriptionsResponse> list(ListTopicSubscriptionsRequest
149183
150184 @ Override
151185 public Future <Empty > delete (DeleteTopicRequest request ) {
152- // TODO: check if null is not going to work for Empty
153186 return translate (publisherApi .deleteTopicCallable ().futureCall (request ), true ,
154187 Code .NOT_FOUND .value ());
155188 }
@@ -195,4 +228,16 @@ public Future<PullResponse> pull(PullRequest request) {
195228 public Future <Empty > modify (ModifyPushConfigRequest request ) {
196229 return translate (subscriberApi .modifyPushConfigCallable ().futureCall (request ), false );
197230 }
231+
232+ @ Override
233+ public ScheduledExecutorService executor () {
234+ return executor ;
235+ }
236+
237+ @ Override
238+ public void close () throws Exception {
239+ subscriberApi .close ();
240+ publisherApi .close ();
241+ executor .shutdown ();
242+ }
198243}
0 commit comments