1919import static com .google .common .base .MoreObjects .firstNonNull ;
2020
2121import com .google .api .gax .core .ConnectionSettings ;
22- import com .google .api .gax .core .RetrySettings ;
2322import com .google .api .gax .grpc .ApiCallSettings ;
2423import com .google .api .gax .grpc .ApiException ;
25- import com .google .auth .oauth2 .GoogleCredentials ;
2624import com .google .cloud .AuthCredentials ;
2725import com .google .cloud .GrpcServiceOptions .ExecutorFactory ;
28- import com .google .cloud .RetryParams ;
2926import com .google .cloud .pubsub .PubSubException ;
3027import com .google .cloud .pubsub .PubSubOptions ;
3128import com .google .cloud .pubsub .spi .v1 .PublisherApi ;
6461import io .grpc .netty .NegotiationType ;
6562import io .grpc .netty .NettyChannelBuilder ;
6663
67- import org .joda .time .Duration ;
68-
6964import java .io .IOException ;
7065import java .util .Set ;
7166import java .util .concurrent .Future ;
@@ -92,6 +87,16 @@ private InternalPubSubOptions(PubSubOptions options) {
9287 protected ExecutorFactory <ScheduledExecutorService > executorFactory () {
9388 return super .executorFactory ();
9489 }
90+
91+ @ Override
92+ protected ApiCallSettings .Builder apiCallSettings () {
93+ return super .apiCallSettings ();
94+ }
95+
96+ @ Override
97+ protected ConnectionSettings .Builder connectionSettings () {
98+ return super .connectionSettings ();
99+ }
95100 }
96101
97102 private static final class PullFutureImpl
@@ -119,7 +124,8 @@ public void onFailure(Throwable error) {
119124 }
120125
121126 public DefaultPubSubRpc (PubSubOptions options ) throws IOException {
122- executorFactory = new InternalPubSubOptions (options ).executorFactory ();
127+ InternalPubSubOptions internalOptions = new InternalPubSubOptions (options );
128+ executorFactory = internalOptions .executorFactory ();
123129 executor = executorFactory .get ();
124130 String libraryName = options .libraryName ();
125131 String libraryVersion = firstNonNull (options .libraryVersion (), "" );
@@ -139,46 +145,20 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
139145 pubBuilder .provideChannelWith (channel , true );
140146 subBuilder .provideChannelWith (channel , true );
141147 } else {
142- GoogleCredentials credentials = options .authCredentials ().credentials ();
143- ConnectionSettings pubConnectionSettings = ConnectionSettings .newBuilder ()
144- .setServiceAddress (options .host ())
145- .setPort (PublisherSettings .getDefaultServicePort ())
146- .provideCredentialsWith (
147- credentials .createScoped (PublisherSettings .getDefaultServiceScopes ()))
148- .build ();
149- ConnectionSettings subConnectionSettings = ConnectionSettings .newBuilder ()
150- .setServiceAddress (options .host ())
151- .setPort (SubscriberSettings .getDefaultServicePort ())
152- .provideCredentialsWith (
153- credentials .createScoped (SubscriberSettings .getDefaultServiceScopes ()))
154- .build ();
155- pubBuilder .provideChannelWith (pubConnectionSettings );
156- subBuilder .provideChannelWith (subConnectionSettings );
148+ ConnectionSettings connectionSettings = internalOptions .connectionSettings ().build ();
149+ pubBuilder .provideChannelWith (connectionSettings );
150+ subBuilder .provideChannelWith (connectionSettings );
157151 }
158- pubBuilder .applyToAllApiMethods (apiCallSettings (options ));
159- subBuilder .applyToAllApiMethods (apiCallSettings (options ));
152+ ApiCallSettings .Builder callSettingsBuilder = internalOptions .apiCallSettings ();
153+ pubBuilder .applyToAllApiMethods (callSettingsBuilder );
154+ subBuilder .applyToAllApiMethods (callSettingsBuilder );
160155 publisherApi = PublisherApi .create (pubBuilder .build ());
161156 subscriberApi = SubscriberApi .create (subBuilder .build ());
162157 } catch (Exception ex ) {
163158 throw new IOException (ex );
164159 }
165160 }
166161
167- private static ApiCallSettings .Builder apiCallSettings (PubSubOptions options ) {
168- // TODO: specify timeout these settings:
169- // retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
170- RetryParams retryParams = options .retryParams ();
171- final RetrySettings .Builder builder = RetrySettings .newBuilder ()
172- .setTotalTimeout (Duration .millis (retryParams .totalRetryPeriodMillis ()))
173- .setInitialRpcTimeout (Duration .millis (options .initialTimeout ()))
174- .setRpcTimeoutMultiplier (options .timeoutMultiplier ())
175- .setMaxRpcTimeout (Duration .millis (options .maxTimeout ()))
176- .setInitialRetryDelay (Duration .millis (retryParams .initialRetryDelayMillis ()))
177- .setRetryDelayMultiplier (retryParams .retryDelayBackoffFactor ())
178- .setMaxRetryDelay (Duration .millis (retryParams .maxRetryDelayMillis ()));
179- return ApiCallSettings .newBuilder ().setRetrySettingsBuilder (builder );
180- }
181-
182162 private static <V > ListenableFuture <V > translate (ListenableFuture <V > from ,
183163 final boolean idempotent , int ... returnNullOn ) {
184164 final Set <Integer > returnNullOnSet = Sets .newHashSetWithExpectedSize (returnNullOn .length );
0 commit comments