3232import com .google .common .util .concurrent .Service ;
3333import com .google .pubsub .v1 .SubscriptionName ;
3434import io .grpc .ManagedChannelBuilder ;
35- import java .util .concurrent .Executor ;
36- import java .util .concurrent .TimeoutException ;
3735import io .grpc .Status ;
3836import io .grpc .StatusRuntimeException ;
3937import io .grpc .netty .GrpcSslContexts ;
4341import java .util .ArrayList ;
4442import java .util .List ;
4543import java .util .concurrent .CountDownLatch ;
44+ import java .util .concurrent .Executor ;
4645import java .util .concurrent .ScheduledExecutorService ;
4746import java .util .concurrent .ScheduledFuture ;
4847import java .util .concurrent .TimeUnit ;
48+ import java .util .concurrent .TimeoutException ;
4949import org .joda .time .Duration ;
5050import org .slf4j .Logger ;
5151import org .slf4j .LoggerFactory ;
8888 * }
8989 *
9090 * Subscriber subscriber =
91- * Subscriber.Builder. newBuilder(MY_SUBSCRIPTION, receiver)
91+ * Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
9292 * .setMaxBundleAcks(100)
9393 * .build();
9494 *
@@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
123123 impl = new SubscriberImpl (builder );
124124 }
125125
126+ /**
127+ * Constructs a new {@link Builder}.
128+ *
129+ * <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
130+ * Subscriber}.
131+ *
132+ * @param subscription Cloud Pub/Sub subscription to bind the subscriber to
133+ * @param receiver an implementation of {@link MessageReceiver} used to process the received
134+ * messages
135+ */
136+ public static Builder newBuilder (SubscriptionName subscription , MessageReceiver receiver ) {
137+ return new Builder (subscription , receiver );
138+ }
139+
126140 /** Subscription which the subscriber is subscribed to. */
127- public String getSubscription () {
128- return impl .getSubscription () ;
141+ public SubscriptionName getSubscriptionName () {
142+ return impl .subscriptionName ;
129143 }
130144
131145 /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
132146 public Duration getAckExpirationPadding () {
133- return impl .getAckExpirationPadding () ;
147+ return impl .ackExpirationPadding ;
134148 }
135149
136150 /** The flow control settings the Subscriber is configured with. */
137151 public FlowController .Settings getFlowControlSettings () {
138- return impl .getFlowControlSettings () ;
152+ return impl .flowControlSettings ;
139153 }
140154
141155 public void addListener (final SubscriberListener listener , Executor executor ) {
@@ -249,7 +263,8 @@ public void terminated(State from) {}
249263 private static class SubscriberImpl extends AbstractService {
250264 private static final Logger logger = LoggerFactory .getLogger (Subscriber .class );
251265
252- private final String subscription ;
266+ private final SubscriptionName subscriptionName ;
267+ private final String cachedSubscriptionNameString ;
253268 private final FlowController .Settings flowControlSettings ;
254269 private final Duration ackExpirationPadding ;
255270 private final ScheduledExecutorService executor ;
@@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService {
270285 private SubscriberImpl (Builder builder ) throws IOException {
271286 receiver = builder .receiver ;
272287 flowControlSettings = builder .flowControlSettings ;
273- subscription = builder .subscription ;
288+ subscriptionName = builder .subscriptionName ;
289+ cachedSubscriptionNameString = subscriptionName .toString ();
274290 ackExpirationPadding = builder .ackExpirationPadding ;
275291 streamAckDeadlineSeconds =
276292 Math .max (
@@ -340,7 +356,7 @@ private void startStreamingConnections() {
340356 for (int i = 0 ; i < numChannels ; i ++) {
341357 streamingSubscriberConnections .add (
342358 new StreamingSubscriberConnection (
343- subscription ,
359+ cachedSubscriptionNameString ,
344360 credentials ,
345361 receiver ,
346362 ackExpirationPadding ,
@@ -412,7 +428,7 @@ private void startPollingConnections() {
412428 for (int i = 0 ; i < numChannels ; i ++) {
413429 pollingSubscriberConnections .add (
414430 new PollingSubscriberConnection (
415- subscription ,
431+ cachedSubscriptionNameString ,
416432 credentials ,
417433 receiver ,
418434 ackExpirationPadding ,
@@ -496,21 +512,6 @@ public void run() {
496512 throw new IllegalStateException (e );
497513 }
498514 }
499-
500- /** Subscription which the subscriber is subscribed to. */
501- public String getSubscription () {
502- return subscription ;
503- }
504-
505- /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
506- public Duration getAckExpirationPadding () {
507- return ackExpirationPadding ;
508- }
509-
510- /** The flow control settings the Subscriber is configured with. */
511- public FlowController .Settings getFlowControlSettings () {
512- return flowControlSettings ;
513- }
514515 }
515516
516517 /** Builder of {@link Subscriber Subscribers}. */
@@ -526,7 +527,7 @@ public static final class Builder {
526527 * Runtime .getRuntime ().availableProcessors ())
527528 .build ();
528529
529- String subscription ;
530+ SubscriptionName subscriptionName ;
530531 Optional <Credentials > credentials = Optional .absent ();
531532 MessageReceiver receiver ;
532533
@@ -539,22 +540,8 @@ public static final class Builder {
539540 Optional .absent ();
540541 Optional <Clock > clock = Optional .absent ();
541542
542- /**
543- * Constructs a new {@link Builder}.
544- *
545- * <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
546- * Subscriber}.
547- *
548- * @param subscription Cloud Pub/Sub subscription to bind the subscriber to
549- * @param receiver an implementation of {@link MessageReceiver} used to process the received
550- * messages
551- */
552- public static Builder newBuilder (SubscriptionName subscription , MessageReceiver receiver ) {
553- return new Builder (subscription .toString (), receiver );
554- }
555-
556- Builder (String subscription , MessageReceiver receiver ) {
557- this .subscription = subscription ;
543+ Builder (SubscriptionName subscriptionName , MessageReceiver receiver ) {
544+ this .subscriptionName = subscriptionName ;
558545 this .receiver = receiver ;
559546 }
560547
0 commit comments