Skip to content

Commit 090326c

Browse files
committed
return Name objects instead of plain String
1 parent f91c809 commit 090326c

6 files changed

Lines changed: 50 additions & 63 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.util.concurrent.Futures;
2323
import com.google.common.util.concurrent.ListenableFuture;
2424
import com.google.common.util.concurrent.MoreExecutors;
25-
import com.google.common.util.concurrent.Service;
2625
import com.google.pubsub.v1.PubsubMessage;
2726
import com.google.pubsub.v1.PushConfig;
2827
import com.google.pubsub.v1.SubscriptionName;
@@ -52,7 +51,7 @@ public ListenableFuture<MessageReceiver.AckReply> receiveMessage(PubsubMessage m
5251
};
5352
Subscriber subscriber = null;
5453
try {
55-
subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build();
54+
subscriber = Subscriber.newBuilder(subscription, receiver).build();
5655
subscriber.addListener(
5756
new Subscriber.SubscriberListener() {
5857
@Override

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static void main(String... args) throws Exception {
4040

4141
Publisher publisher = null;
4242
try {
43-
publisher = Publisher.Builder.newBuilder(topic).build();
43+
publisher = Publisher.newBuilder(topic).build();
4444
List<String> messages = Arrays.asList("first message", "second message");
4545
List<ListenableFuture<String>> messageIds = new ArrayList<>();
4646
for (String message : messages) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
*
8383
* <pre><code>
8484
* Publisher publisher =
85-
* Publisher.Builder.newBuilder(MY_TOPIC)
85+
* Publisher.newBuilder(MY_TOPIC)
8686
* .setMaxBundleDuration(new Duration(10 * 1000))
8787
* .build();
8888
* List&lt;ListenableFuture&lt;String&gt;&gt; results = new ArrayList&lt;&gt;();
@@ -121,6 +121,7 @@ public static long getApiMaxRequestBytes() {
121121

122122
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
123123

124+
private final TopicName topicName;
124125
private final String topic;
125126

126127
private final BundlingSettings bundlingSettings;
@@ -147,7 +148,8 @@ public static long getApiMaxRequestBytes() {
147148
private ScheduledFuture<?> currentAlarmFuture;
148149

149150
private Publisher(Builder builder) throws IOException {
150-
topic = builder.topic;
151+
topicName = builder.topicName;
152+
topic = topicName.toString();
151153

152154
this.bundlingSettings = builder.bundlingSettings;
153155
this.retrySettings = builder.retrySettings;
@@ -195,8 +197,8 @@ public void close() throws IOException {
195197
}
196198

197199
/** Topic which the publisher publishes to. */
198-
public String getTopic() {
199-
return topic;
200+
public TopicName getTopicName() {
201+
return topicName;
200202
}
201203

202204
/**
@@ -520,6 +522,11 @@ private boolean isRetryable(Throwable t) {
520522
}
521523
}
522524

525+
/** Constructs a new {@link Builder} using the given topic. */
526+
public static Builder newBuilder(TopicName topicName) {
527+
return new Builder(topicName);
528+
}
529+
523530
/** A builder of {@link Publisher}s. */
524531
public static final class Builder {
525532
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
@@ -554,7 +561,7 @@ public static final class Builder {
554561
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
555562
.build();
556563

557-
String topic;
564+
TopicName topicName;
558565

559566
// Bundling options
560567
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
@@ -572,13 +579,8 @@ public static final class Builder {
572579

573580
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
574581

575-
/** Constructs a new {@link Builder} using the given topic. */
576-
public static Builder newBuilder(TopicName topic) {
577-
return new Builder(topic.toString());
578-
}
579-
580-
Builder(String topic) {
581-
this.topic = Preconditions.checkNotNull(topic);
582+
private Builder(TopicName topic) {
583+
this.topicName = Preconditions.checkNotNull(topic);
582584
}
583585

584586
/**

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import com.google.common.util.concurrent.Service;
3333
import com.google.pubsub.v1.SubscriptionName;
3434
import io.grpc.ManagedChannelBuilder;
35-
import java.util.concurrent.Executor;
36-
import java.util.concurrent.TimeoutException;
3735
import io.grpc.Status;
3836
import io.grpc.StatusRuntimeException;
3937
import io.grpc.netty.GrpcSslContexts;
@@ -43,9 +41,11 @@
4341
import java.util.ArrayList;
4442
import java.util.List;
4543
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.Executor;
4645
import java.util.concurrent.ScheduledExecutorService;
4746
import java.util.concurrent.ScheduledFuture;
4847
import java.util.concurrent.TimeUnit;
48+
import java.util.concurrent.TimeoutException;
4949
import org.joda.time.Duration;
5050
import org.slf4j.Logger;
5151
import org.slf4j.LoggerFactory;
@@ -88,7 +88,7 @@
8888
* }
8989
*
9090
* Subscriber subscriber =
91-
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
91+
* Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
9292
* .setMaxBundleAcks(100)
9393
* .build();
9494
*
@@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
123123
impl = new SubscriberImpl(builder);
124124
}
125125

126+
/**
127+
* Constructs a new {@link Builder}.
128+
*
129+
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
130+
* Subscriber}.
131+
*
132+
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
133+
* @param receiver an implementation of {@link MessageReceiver} used to process the received
134+
* messages
135+
*/
136+
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
137+
return new Builder(subscription, receiver);
138+
}
139+
126140
/** Subscription which the subscriber is subscribed to. */
127-
public String getSubscription() {
128-
return impl.getSubscription();
141+
public SubscriptionName getSubscriptionName() {
142+
return impl.subscriptionName;
129143
}
130144

131145
/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
132146
public Duration getAckExpirationPadding() {
133-
return impl.getAckExpirationPadding();
147+
return impl.ackExpirationPadding;
134148
}
135149

136150
/** The flow control settings the Subscriber is configured with. */
137151
public FlowController.Settings getFlowControlSettings() {
138-
return impl.getFlowControlSettings();
152+
return impl.flowControlSettings;
139153
}
140154

141155
public void addListener(final SubscriberListener listener, Executor executor) {
@@ -249,6 +263,7 @@ public void terminated(State from) {}
249263
private static class SubscriberImpl extends AbstractService {
250264
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
251265

266+
private final SubscriptionName subscriptionName;
252267
private final String subscription;
253268
private final FlowController.Settings flowControlSettings;
254269
private final Duration ackExpirationPadding;
@@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService {
270285
private SubscriberImpl(Builder builder) throws IOException {
271286
receiver = builder.receiver;
272287
flowControlSettings = builder.flowControlSettings;
273-
subscription = builder.subscription;
288+
subscriptionName = builder.subscriptionName;
289+
subscription = subscriptionName.toString();
274290
ackExpirationPadding = builder.ackExpirationPadding;
275291
streamAckDeadlineSeconds =
276292
Math.max(
@@ -496,21 +512,6 @@ public void run() {
496512
throw new IllegalStateException(e);
497513
}
498514
}
499-
500-
/** Subscription which the subscriber is subscribed to. */
501-
public String getSubscription() {
502-
return subscription;
503-
}
504-
505-
/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
506-
public Duration getAckExpirationPadding() {
507-
return ackExpirationPadding;
508-
}
509-
510-
/** The flow control settings the Subscriber is configured with. */
511-
public FlowController.Settings getFlowControlSettings() {
512-
return flowControlSettings;
513-
}
514515
}
515516

516517
/** Builder of {@link Subscriber Subscribers}. */
@@ -526,7 +527,7 @@ public static final class Builder {
526527
* Runtime.getRuntime().availableProcessors())
527528
.build();
528529

529-
String subscription;
530+
SubscriptionName subscriptionName;
530531
Optional<Credentials> credentials = Optional.absent();
531532
MessageReceiver receiver;
532533

@@ -539,22 +540,8 @@ public static final class Builder {
539540
Optional.absent();
540541
Optional<Clock> clock = Optional.absent();
541542

542-
/**
543-
* Constructs a new {@link Builder}.
544-
*
545-
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
546-
* Subscriber}.
547-
*
548-
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
549-
* @param receiver an implementation of {@link MessageReceiver} used to process the received
550-
* messages
551-
*/
552-
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
553-
return new Builder(subscription.toString(), receiver);
554-
}
555-
556-
Builder(String subscription, MessageReceiver receiver) {
557-
this.subscription = subscription;
543+
Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
544+
this.subscriptionName = subscriptionName;
558545
this.receiver = receiver;
559546
}
560547

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
353353
public void testPublisherGetters() throws Exception {
354354
FakeCredentials credentials = new FakeCredentials();
355355

356-
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
356+
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
357357
builder.setChannelBuilder(testChannelBuilder);
358358
builder.setCredentials(credentials);
359359
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
@@ -371,7 +371,7 @@ public void testPublisherGetters() throws Exception {
371371
.build());
372372
Publisher publisher = builder.build();
373373

374-
assertEquals(TEST_TOPIC.toString(), publisher.getTopic());
374+
assertEquals(TEST_TOPIC, publisher.getTopicName());
375375
assertEquals(10, (long) publisher.getBundlingSettings().getRequestByteThreshold());
376376
assertEquals(new Duration(11), publisher.getBundlingSettings().getDelayThreshold());
377377
assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold());
@@ -385,8 +385,8 @@ public void testPublisherGetters() throws Exception {
385385

386386
@Test
387387
public void testBuilderParametersAndDefaults() {
388-
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
389-
assertEquals(TEST_TOPIC.toString(), builder.topic);
388+
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
389+
assertEquals(TEST_TOPIC, builder.topicName);
390390
assertEquals(Optional.absent(), builder.channelBuilder);
391391
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
392392
assertFalse(builder.failOnFlowControlLimits);
@@ -405,7 +405,7 @@ public void testBuilderParametersAndDefaults() {
405405

406406
@Test
407407
public void testBuilderInvalidArguments() {
408-
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
408+
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
409409

410410
try {
411411
builder.setChannelBuilder(null);
@@ -602,7 +602,7 @@ public void testBuilderInvalidArguments() {
602602
}
603603

604604
private Builder getTestPublisherBuilder() {
605-
return Publisher.Builder.newBuilder(TEST_TOPIC)
605+
return Publisher.newBuilder(TEST_TOPIC)
606606
.setCredentials(testCredentials)
607607
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
608608
.setChannelBuilder(testChannelBuilder);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.common.base.Preconditions;
3030
import com.google.common.collect.ImmutableList;
3131
import com.google.common.util.concurrent.ListenableFuture;
32-
import com.google.common.util.concurrent.Service.State;
3332
import com.google.common.util.concurrent.SettableFuture;
3433
import com.google.pubsub.v1.PubsubMessage;
3534
import com.google.pubsub.v1.PullResponse;
@@ -467,7 +466,7 @@ private void sendMessages(Iterable<String> ackIds) throws InterruptedException {
467466
}
468467

469468
private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
470-
return Subscriber.Builder.newBuilder(TEST_SUBSCRIPTION, receiver)
469+
return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver)
471470
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
472471
.setCredentials(testCredentials)
473472
.setChannelBuilder(testChannelBuilder)

0 commit comments

Comments
 (0)