Skip to content

Commit 27899aa

Browse files
authored
---
yaml --- r: 9109 b: refs/heads/master c: bc07a35 h: refs/heads/master i: 9107: fa59981
1 parent 8760a39 commit 27899aa

4 files changed

Lines changed: 68 additions & 37 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 848245e10efcb6a39b0de435c621e2ccb4e8a83d
2+
refs/heads/master: bc07a352e1976d29ebe083c15cce6f227b4e0e50
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 8e9b065ba06cd7a4af306aaea1010aade81670e0
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,17 @@
4040
import com.google.common.collect.ImmutableMap;
4141
import com.google.common.util.concurrent.FutureCallback;
4242
import com.google.common.util.concurrent.Futures;
43-
import com.google.pubsub.v1.ProjectTopicName;
4443
import com.google.pubsub.v1.PublishRequest;
4544
import com.google.pubsub.v1.PublishResponse;
4645
import com.google.pubsub.v1.PublisherGrpc;
4746
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
4847
import com.google.pubsub.v1.PubsubMessage;
48+
import com.google.pubsub.v1.TopicName;
49+
import com.google.pubsub.v1.TopicNames;
4950
import io.grpc.CallCredentials;
5051
import io.grpc.Channel;
5152
import io.grpc.Status;
5253
import io.grpc.auth.MoreCallCredentials;
53-
import org.threeten.bp.Duration;
54-
55-
import javax.annotation.Nullable;
5654
import java.io.IOException;
5755
import java.util.ArrayList;
5856
import java.util.Iterator;
@@ -68,6 +66,8 @@
6866
import java.util.concurrent.locks.ReentrantLock;
6967
import java.util.logging.Level;
7068
import java.util.logging.Logger;
69+
import javax.annotation.Nullable;
70+
import org.threeten.bp.Duration;
7171

7272
/**
7373
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
@@ -90,8 +90,7 @@
9090
public class Publisher {
9191
private static final Logger logger = Logger.getLogger(Publisher.class.getName());
9292

93-
private final ProjectTopicName topicName;
94-
private final String cachedTopicNameString;
93+
private final String topicName;
9594

9695
private final BatchingSettings batchingSettings;
9796
private final RetrySettings retrySettings;
@@ -124,7 +123,6 @@ public static long getApiMaxRequestBytes() {
124123

125124
private Publisher(Builder builder) throws IOException {
126125
topicName = builder.topicName;
127-
cachedTopicNameString = topicName.toString();
128126

129127
this.batchingSettings = builder.batchingSettings;
130128
this.retrySettings = builder.retrySettings;
@@ -167,7 +165,12 @@ private Publisher(Builder builder) throws IOException {
167165
}
168166

169167
/** Topic which the publisher publishes to. */
170-
public ProjectTopicName getTopicName() {
168+
public TopicName getTopicName() {
169+
return TopicNames.parse(topicName);
170+
}
171+
172+
/** Topic which the publisher publishes to. */
173+
public String getTopicNameString() {
171174
return topicName;
172175
}
173176

@@ -312,7 +315,7 @@ private void publishAllOutstanding() {
312315

313316
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
314317
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
315-
publishRequest.setTopic(cachedTopicNameString);
318+
publishRequest.setTopic(topicName);
316319
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
317320
publishRequest.addMessages(outstandingPublish.message);
318321
}
@@ -497,6 +500,7 @@ interface LongRandom {
497500
* Constructs a new {@link Builder} using the given topic.
498501
*
499502
* <p>Example of creating a {@code Publisher}.
503+
*
500504
* <pre>{@code
501505
* String projectName = "my_project";
502506
* String topicName = "my_topic";
@@ -509,9 +513,28 @@ interface LongRandom {
509513
* publisher.shutdown();
510514
* }
511515
* }</pre>
516+
*/
517+
public static Builder newBuilder(TopicName topicName) {
518+
return newBuilder(topicName.toString());
519+
}
520+
521+
/**
522+
* Constructs a new {@link Builder} using the given topic.
523+
*
524+
* <p>Example of creating a {@code Publisher}.
512525
*
526+
* <pre>{@code
527+
* String topic = "projects/my_project/topics/my_topic";
528+
* Publisher publisher = Publisher.newBuilder(topic).build();
529+
* try {
530+
* // ...
531+
* } finally {
532+
* // When finished with the publisher, make sure to shutdown to free up resources.
533+
* publisher.shutdown();
534+
* }
535+
* }</pre>
513536
*/
514-
public static Builder newBuilder(ProjectTopicName topicName) {
537+
public static Builder newBuilder(String topicName) {
515538
return new Builder(topicName);
516539
}
517540

@@ -556,7 +579,7 @@ public long nextLong(long least, long bound) {
556579
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
557580
.build();
558581

559-
ProjectTopicName topicName;
582+
String topicName;
560583

561584
// Batching options
562585
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
@@ -574,7 +597,7 @@ public long nextLong(long least, long bound) {
574597
CredentialsProvider credentialsProvider =
575598
TopicAdminSettings.defaultCredentialsProviderBuilder().build();
576599

577-
private Builder(ProjectTopicName topic) {
600+
private Builder(String topic) {
578601
this.topicName = Preconditions.checkNotNull(topic);
579602
}
580603

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService {
107107

108108
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
109109

110-
private final ProjectSubscriptionName subscriptionName;
111-
private final String cachedSubscriptionNameString;
110+
private final String subscriptionName;
112111
private final FlowControlSettings flowControlSettings;
113112
private final Duration ackExpirationPadding;
114113
private final Duration maxAckExtensionPeriod;
@@ -135,7 +134,6 @@ private Subscriber(Builder builder) {
135134
receiver = builder.receiver;
136135
flowControlSettings = builder.flowControlSettings;
137136
subscriptionName = builder.subscriptionName;
138-
cachedSubscriptionNameString = subscriptionName.toString();
139137

140138
Preconditions.checkArgument(
141139
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
@@ -204,19 +202,32 @@ public void close() throws IOException {
204202
/**
205203
* Constructs a new {@link Builder}.
206204
*
207-
* <p>Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link
208-
* Subscriber}.
209-
*
210205
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
211206
* @param receiver an implementation of {@link MessageReceiver} used to process the received
212207
* messages
213208
*/
214209
public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) {
210+
return newBuilder(subscription.toString(), receiver);
211+
}
212+
213+
/**
214+
* Constructs a new {@link Builder}.
215+
*
216+
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
217+
* @param receiver an implementation of {@link MessageReceiver} used to process the received
218+
* messages
219+
*/
220+
public static Builder newBuilder(String subscription, MessageReceiver receiver) {
215221
return new Builder(subscription, receiver);
216222
}
217223

218224
/** Subscription which the subscriber is subscribed to. */
219225
public ProjectSubscriptionName getSubscriptionName() {
226+
return ProjectSubscriptionName.parse(subscriptionName);
227+
}
228+
229+
/** Subscription which the subscriber is subscribed to. */
230+
public String getSubscriptionNameString() {
220231
return subscriptionName;
221232
}
222233

@@ -343,9 +354,7 @@ private void startPollingConnections() throws IOException {
343354
}
344355
Subscription subscriptionInfo =
345356
getSubStub.getSubscription(
346-
GetSubscriptionRequest.newBuilder()
347-
.setSubscription(cachedSubscriptionNameString)
348-
.build());
357+
GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build());
349358

350359
for (Channel channel : channels) {
351360
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel);
@@ -401,7 +410,7 @@ private void startStreamingConnections() throws IOException {
401410
}
402411
streamingSubscriberConnections.add(
403412
new StreamingSubscriberConnection(
404-
cachedSubscriptionNameString,
413+
subscriptionName,
405414
receiver,
406415
ackExpirationPadding,
407416
maxAckExtensionPeriod,
@@ -491,7 +500,7 @@ public static final class Builder {
491500
* Runtime.getRuntime().availableProcessors())
492501
.build();
493502

494-
ProjectSubscriptionName subscriptionName;
503+
String subscriptionName;
495504
MessageReceiver receiver;
496505

497506
Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
@@ -519,7 +528,7 @@ public static final class Builder {
519528
boolean useStreaming = true;
520529
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;
521530

522-
Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) {
531+
Builder(String subscriptionName, MessageReceiver receiver) {
523532
this.subscriptionName = subscriptionName;
524533
this.receiver = receiver;
525534
}

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package com.google.cloud.pubsub.v1;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertSame;
22+
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.fail;
24+
1925
import com.google.api.core.ApiFuture;
2026
import com.google.api.gax.batching.BatchingSettings;
2127
import com.google.api.gax.core.ExecutorProvider;
@@ -35,24 +41,17 @@
3541
import io.grpc.StatusException;
3642
import io.grpc.inprocess.InProcessChannelBuilder;
3743
import io.grpc.inprocess.InProcessServerBuilder;
44+
import java.io.IOException;
45+
import java.util.Map;
46+
import java.util.concurrent.ExecutionException;
47+
import java.util.concurrent.ScheduledExecutorService;
3848
import org.junit.After;
3949
import org.junit.Before;
4050
import org.junit.Test;
4151
import org.junit.runner.RunWith;
4252
import org.junit.runners.JUnit4;
4353
import org.threeten.bp.Duration;
4454

45-
import java.io.IOException;
46-
import java.util.Map;
47-
import java.util.concurrent.ExecutionException;
48-
import java.util.concurrent.ScheduledExecutorService;
49-
50-
import static org.junit.Assert.assertEquals;
51-
import static org.junit.Assert.assertFalse;
52-
import static org.junit.Assert.assertSame;
53-
import static org.junit.Assert.assertTrue;
54-
import static org.junit.Assert.fail;
55-
5655
@RunWith(JUnit4.class)
5756
public class PublisherImplTest {
5857

@@ -472,7 +471,7 @@ public void testPublisherGetters() throws Exception {
472471
@Test
473472
public void testBuilderParametersAndDefaults() {
474473
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
475-
assertEquals(TEST_TOPIC, builder.topicName);
474+
assertEquals(TEST_TOPIC.toString(), builder.topicName);
476475
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
477476
assertEquals(
478477
Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,

0 commit comments

Comments
 (0)