Skip to content

Commit c33722f

Browse files
committed
clean up Publisher and Subscriber surface
- PUBSUB_API_ADDRESS and PUBSUB_API_SCOPE are removed. We now use {Publisher,Subscriber}Settings instead. - Default settings are moved into the Builder class, so that they have package-private visibility. We can make them public later if we want to. - Subscriber doc is fixed so that it is not chopped off mid-sentence. - Subscriber doc mistakenly referred to the Publisher. This is now fixed.
1 parent 0089583 commit c33722f

5 files changed

Lines changed: 76 additions & 60 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,10 @@
7979
* </code></pre>
8080
*/
8181
public interface Publisher {
82-
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
83-
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
84-
8582
// API limits.
8683
int MAX_BUNDLE_MESSAGES = 1000;
8784
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8885

89-
// Meaningful defaults.
90-
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
91-
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
92-
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
93-
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
94-
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
95-
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
96-
97-
BundlingSettings DEFAULT_BUNDLING_SETTINGS =
98-
BundlingSettings.newBuilder()
99-
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
100-
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
101-
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
102-
.build();
103-
10486
/** Topic to which the publisher publishes to. */
10587
String getTopic();
10688

@@ -163,6 +145,20 @@ public interface Publisher {
163145

164146
/** A builder of {@link Publisher}s. */
165147
public final class Builder {
148+
// Meaningful defaults.
149+
static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
150+
static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
151+
static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
152+
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
153+
static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
154+
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
155+
static final BundlingSettings DEFAULT_BUNDLING_SETTINGS =
156+
BundlingSettings.newBuilder()
157+
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
158+
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
159+
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
160+
.build();
161+
166162
String topic;
167163

168164
// Bundling options

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
2122
import com.google.common.base.Optional;
2223
import com.google.common.collect.ImmutableList;
2324
import com.google.common.primitives.Ints;
@@ -38,7 +39,6 @@
3839
import io.grpc.netty.NegotiationType;
3940
import io.grpc.netty.NettyChannelBuilder;
4041
import java.io.IOException;
41-
import java.util.Collections;
4242
import java.util.Iterator;
4343
import java.util.LinkedList;
4444
import java.util.List;
@@ -125,7 +125,9 @@ final class PublisherImpl implements Publisher {
125125
channels[i] =
126126
builder.channelBuilder.isPresent()
127127
? builder.channelBuilder.get().build()
128-
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
128+
: NettyChannelBuilder.forAddress(
129+
PublisherSettings.getDefaultServiceAddress(),
130+
PublisherSettings.getDefaultServicePort())
129131
.negotiationType(NegotiationType.TLS)
130132
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
131133
.executor(executor)
@@ -136,7 +138,7 @@ final class PublisherImpl implements Publisher {
136138
builder.userCredentials.isPresent()
137139
? builder.userCredentials.get()
138140
: GoogleCredentials.getApplicationDefault()
139-
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE)));
141+
.createScoped(PublisherSettings.getDefaultServiceScopes()));
140142
shutdown = new AtomicBoolean(false);
141143
messagesWaiter = new MessagesWaiter();
142144
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
* in memory before the receiver either ack or nack them.
5555
* </ul>
5656
*
57-
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
58-
* through {@link GoogleCredentials#getApplicationDefault}.
57+
* <p>If no credentials are provided, the {@link Subscriber} will use application default
58+
* credentials through {@link GoogleCredentials#getApplicationDefault}.
5959
*
6060
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
6161
*
@@ -88,9 +88,6 @@
8888
* </code></pre>
8989
*/
9090
public interface Subscriber extends Service {
91-
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
92-
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
93-
9491
/** Retrieves a snapshot of the current subscriber statistics. */
9592
SubscriberStats getStats();
9693

@@ -125,15 +122,15 @@ public static enum AckReply {
125122
Duration getAckExpirationPadding();
126123

127124
/**
128-
* Maximum number of outstanding (i.e. pending to process) messages before limits are enforced.
125+
* Maximum number of outstanding messages before limits are enforced.
129126
*
130127
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
131128
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
132129
* management, still some extra bytes could be kept at lower layers.</b>
133130
*/
134131
Optional<Integer> getMaxOutstandingElementCount();
135132

136-
/** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */
133+
/** Maximum number of outstanding bytes before limits are enforced. */
137134
Optional<Integer> getMaxOutstandingRequestBytes();
138135

139136
/** Builder of {@link Subscriber Subscribers}. */
@@ -158,7 +155,7 @@ public final class Builder {
158155
* Constructs a new {@link Builder}.
159156
*
160157
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
161-
* Publisher}.
158+
* Subscriber}.
162159
*
163160
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
164161
* @param receiver an implementation of {@link MessageReceiver} used to process the received
@@ -226,8 +223,8 @@ public Builder setExecutor(ScheduledExecutorService executor) {
226223
return this;
227224
}
228225

229-
/** Gives the ability to set a custom executor. */
230-
public Builder setClock(Clock clock) {
226+
/** Gives the ability to set a custom clock. */
227+
Builder setClock(Clock clock) {
231228
this.clock = Optional.of(clock);
232229
return this;
233230
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.auth.Credentials;
2222
import com.google.auth.oauth2.GoogleCredentials;
2323
import com.google.cloud.Clock;
24+
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Optional;
2627
import com.google.common.primitives.Ints;
@@ -35,7 +36,6 @@
3536
import io.grpc.netty.NettyChannelBuilder;
3637
import java.io.IOException;
3738
import java.util.ArrayList;
38-
import java.util.Collections;
3939
import java.util.List;
4040
import java.util.concurrent.CountDownLatch;
4141
import java.util.concurrent.Executors;
@@ -104,7 +104,9 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
104104
channelBuilder =
105105
builder.channelBuilder.isPresent()
106106
? builder.channelBuilder.get()
107-
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
107+
: NettyChannelBuilder.forAddress(
108+
SubscriberSettings.getDefaultServiceAddress(),
109+
SubscriberSettings.getDefaultServicePort())
108110
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
109111
.flowControlWindow(5000000) // 2.5 MB
110112
.negotiationType(NegotiationType.TLS)
@@ -115,7 +117,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
115117
builder.credentials.isPresent()
116118
? builder.credentials.get()
117119
: GoogleCredentials.getApplicationDefault()
118-
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE));
120+
.createScoped(SubscriberSettings.getDefaultServiceScopes());
119121

120122
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
121123
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testPublishByDuration() throws Exception {
103103
getTestPublisherBuilder()
104104
// To demonstrate that reaching duration will trigger publish
105105
.setBundlingSettings(
106-
Publisher.DEFAULT_BUNDLING_SETTINGS
106+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
107107
.toBuilder()
108108
.setDelayThreshold(Duration.standardSeconds(5))
109109
.setElementCountThreshold(10)
@@ -134,7 +134,7 @@ public void testPublishByNumBundledMessages() throws Exception {
134134
Publisher publisher =
135135
getTestPublisherBuilder()
136136
.setBundlingSettings(
137-
Publisher.DEFAULT_BUNDLING_SETTINGS
137+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
138138
.toBuilder()
139139
.setElementCountThreshold(2)
140140
.setDelayThreshold(Duration.standardSeconds(100))
@@ -173,7 +173,7 @@ public void testSinglePublishByNumBytes() throws Exception {
173173
Publisher publisher =
174174
getTestPublisherBuilder()
175175
.setBundlingSettings(
176-
Publisher.DEFAULT_BUNDLING_SETTINGS
176+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
177177
.toBuilder()
178178
.setElementCountThreshold(2)
179179
.setDelayThreshold(Duration.standardSeconds(100))
@@ -208,7 +208,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
208208
getTestPublisherBuilder()
209209
// To demonstrate that reaching duration will trigger publish
210210
.setBundlingSettings(
211-
Publisher.DEFAULT_BUNDLING_SETTINGS
211+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
212212
.toBuilder()
213213
.setElementCountThreshold(2)
214214
.setDelayThreshold(Duration.standardSeconds(5))
@@ -256,7 +256,7 @@ public void testPublishFailureRetries() throws Exception {
256256
getTestPublisherBuilder()
257257
.setExecutor(Executors.newSingleThreadScheduledExecutor())
258258
.setBundlingSettings(
259-
Publisher.DEFAULT_BUNDLING_SETTINGS
259+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
260260
.toBuilder()
261261
.setElementCountThreshold(1)
262262
.setDelayThreshold(Duration.standardSeconds(5))
@@ -281,7 +281,7 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
281281
.setExecutor(Executors.newSingleThreadScheduledExecutor())
282282
.setSendBundleDeadline(Duration.standardSeconds(10))
283283
.setBundlingSettings(
284-
Publisher.DEFAULT_BUNDLING_SETTINGS
284+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
285285
.toBuilder()
286286
.setElementCountThreshold(1)
287287
.setDelayThreshold(Duration.standardSeconds(5))
@@ -310,7 +310,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
310310
.setExecutor(Executors.newSingleThreadScheduledExecutor())
311311
.setSendBundleDeadline(Duration.standardSeconds(10))
312312
.setBundlingSettings(
313-
Publisher.DEFAULT_BUNDLING_SETTINGS
313+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
314314
.toBuilder()
315315
.setElementCountThreshold(1)
316316
.setDelayThreshold(Duration.standardSeconds(5))
@@ -371,16 +371,17 @@ public void testBuilderParametersAndDefaults() {
371371
assertEquals(Optional.absent(), builder.executor);
372372
assertFalse(builder.failOnFlowControlLimits);
373373
assertEquals(
374-
Publisher.DEFAULT_MAX_BUNDLE_BYTES,
374+
Publisher.Builder.DEFAULT_MAX_BUNDLE_BYTES,
375375
builder.bundlingSettings.getRequestByteThreshold().longValue());
376376
assertEquals(
377-
Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.bundlingSettings.getDelayThreshold());
377+
Publisher.Builder.DEFAULT_MAX_BUNDLE_DURATION,
378+
builder.bundlingSettings.getDelayThreshold());
378379
assertEquals(
379-
Publisher.DEFAULT_MAX_BUNDLE_MESSAGES,
380+
Publisher.Builder.DEFAULT_MAX_BUNDLE_MESSAGES,
380381
builder.bundlingSettings.getElementCountThreshold().longValue());
381382
assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings);
382-
assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout);
383-
assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline);
383+
assertEquals(Publisher.Builder.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout);
384+
assertEquals(Publisher.Builder.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline);
384385
assertEquals(Optional.absent(), builder.userCredentials);
385386
}
386387

@@ -410,7 +411,7 @@ public void testBuilderInvalidArguments() {
410411
}
411412
try {
412413
builder.setBundlingSettings(
413-
Publisher.DEFAULT_BUNDLING_SETTINGS
414+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
414415
.toBuilder()
415416
.setRequestByteThreshold((Long) null)
416417
.build());
@@ -420,31 +421,40 @@ public void testBuilderInvalidArguments() {
420421
}
421422
try {
422423
builder.setBundlingSettings(
423-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(0).build());
424+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
425+
.toBuilder()
426+
.setRequestByteThreshold(0)
427+
.build());
424428
fail("Should have thrown an IllegalArgumentException");
425429
} catch (IllegalArgumentException expected) {
426430
// Expected
427431
}
428432
try {
429433
builder.setBundlingSettings(
430-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(-1).build());
434+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
435+
.toBuilder()
436+
.setRequestByteThreshold(-1)
437+
.build());
431438
fail("Should have thrown an IllegalArgumentException");
432439
} catch (IllegalArgumentException expected) {
433440
// Expected
434441
}
435442

436443
builder.setBundlingSettings(
437-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(new Duration(1)).build());
444+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
445+
.toBuilder()
446+
.setDelayThreshold(new Duration(1))
447+
.build());
438448
try {
439449
builder.setBundlingSettings(
440-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build());
450+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build());
441451
fail("Should have thrown an NullPointerException");
442452
} catch (NullPointerException expected) {
443453
// Expected
444454
}
445455
try {
446456
builder.setBundlingSettings(
447-
Publisher.DEFAULT_BUNDLING_SETTINGS
457+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
448458
.toBuilder()
449459
.setDelayThreshold(new Duration(-1))
450460
.build());
@@ -454,10 +464,13 @@ public void testBuilderInvalidArguments() {
454464
}
455465

456466
builder.setBundlingSettings(
457-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build());
467+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
468+
.toBuilder()
469+
.setElementCountThreshold(1)
470+
.build());
458471
try {
459472
builder.setBundlingSettings(
460-
Publisher.DEFAULT_BUNDLING_SETTINGS
473+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
461474
.toBuilder()
462475
.setElementCountThreshold((Long) null)
463476
.build());
@@ -467,14 +480,20 @@ public void testBuilderInvalidArguments() {
467480
}
468481
try {
469482
builder.setBundlingSettings(
470-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(0).build());
483+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
484+
.toBuilder()
485+
.setElementCountThreshold(0)
486+
.build());
471487
fail("Should have thrown an IllegalArgumentException");
472488
} catch (IllegalArgumentException expected) {
473489
// Expected
474490
}
475491
try {
476492
builder.setBundlingSettings(
477-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(-1).build());
493+
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
494+
.toBuilder()
495+
.setElementCountThreshold(-1)
496+
.build());
478497
fail("Should have thrown an IllegalArgumentException");
479498
} catch (IllegalArgumentException expected) {
480499
// Expected
@@ -532,16 +551,16 @@ public void testBuilderInvalidArguments() {
532551
// Expected
533552
}
534553

535-
builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT);
554+
builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT);
536555
try {
537-
builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT.minus(1));
556+
builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT.minus(1));
538557
fail("Should have thrown an IllegalArgumentException");
539558
} catch (IllegalArgumentException expected) {
540559
// Expected
541560
}
542-
builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION);
561+
builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION);
543562
try {
544-
builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION.minus(1));
563+
builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION.minus(1));
545564
fail("Should have thrown an IllegalArgumentException");
546565
} catch (IllegalArgumentException expected) {
547566
// Expected

0 commit comments

Comments
 (0)