Skip to content

Commit db4908e

Browse files
authored
make pubsub work with new gax (#1505)
* make pubsub work with new gax update documentation links too
1 parent 6a4bc9d commit db4908e

13 files changed

Lines changed: 166 additions & 531 deletions

File tree

google-cloud-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<dependency>
112112
<groupId>com.google.api</groupId>
113113
<artifactId>gax</artifactId>
114-
<version>0.0.25</version>
114+
<version>0.0.27</version>
115115
<exclusions>
116116
<exclusion>
117117
<groupId>io.grpc</groupId>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java

Lines changed: 0 additions & 83 deletions
This file was deleted.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616

1717
package com.google.cloud.pubsub;
1818

19-
import com.google.auth.Credentials;
19+
import com.google.api.gax.bundling.FlowController;
2020
import com.google.cloud.Clock;
21-
import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
2221
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2322
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2423
import com.google.common.annotations.VisibleForTesting;
2524
import com.google.common.collect.Lists;
2625
import com.google.common.primitives.Ints;
27-
import com.google.common.util.concurrent.AbstractService;
2826
import com.google.common.util.concurrent.FutureCallback;
2927
import com.google.common.util.concurrent.Futures;
3028
import com.google.pubsub.v1.PubsubMessage;
3129
import com.google.pubsub.v1.ReceivedMessage;
32-
import io.grpc.Status;
3330
import java.util.ArrayList;
3431
import java.util.Collection;
3532
import java.util.HashMap;
@@ -287,7 +284,7 @@ public void run() {
287284
}
288285
try {
289286
flowController.reserve(receivedMessagesCount, totalByteCount);
290-
} catch (CloudPubsubFlowControlException unexpectedException) {
287+
} catch (FlowController.FlowControlException unexpectedException) {
291288
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
292289
}
293290
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.pubsub.StatusUtil.isRetryable;
2020

21+
import com.google.api.gax.bundling.FlowController;
2122
import com.google.auth.Credentials;
2223
import com.google.cloud.Clock;
2324
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
@@ -39,7 +40,6 @@
3940
import io.grpc.Channel;
4041
import io.grpc.StatusRuntimeException;
4142
import io.grpc.auth.MoreCallCredentials;
42-
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.concurrent.ScheduledExecutorService;
4545
import java.util.concurrent.TimeUnit;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 23 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.api.gax.bundling.FlowController;
1920
import com.google.api.gax.grpc.BundlingSettings;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.GoogleCredentials;
@@ -86,8 +87,8 @@ public interface Publisher {
8687
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8788

8889
// Meaningful defaults.
89-
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
90-
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
90+
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
91+
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
9192
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
9293
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
9394
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
@@ -129,21 +130,22 @@ public interface Publisher {
129130
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
130131
* See {@link #failOnFlowControlLimits()}.
131132
*/
132-
Optional<Integer> getMaxOutstandingMessages();
133+
Optional<Integer> getMaxOutstandingElementCount();
133134

134135
/**
135136
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
136137
* {@link #failOnFlowControlLimits()}.
137138
*/
138-
Optional<Integer> getMaxOutstandingBytes();
139+
Optional<Integer> getMaxOutstandingRequestBytes();
139140

140141
/**
141142
* Whether to block publish calls when reaching flow control limits (see {@link
142-
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
143+
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
143144
*
144145
* <p>If set to false, a publish call will fail with either {@link
145-
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
146-
* appropriate, when flow control limits are reached.
146+
* RequestByteMaxOutstandingReachedException} or {@link
147+
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
148+
* reached.
147149
*/
148150
boolean failOnFlowControlLimits();
149151

@@ -164,24 +166,24 @@ final class Builder {
164166
String topic;
165167

166168
// Bundling options
167-
BundlingSettings bundlingSettings;
169+
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
168170

169171
// Client-side flow control options
170-
Optional<Integer> maxOutstandingMessages;
171-
Optional<Integer> maxOutstandingBytes;
172-
boolean failOnFlowControlLimits;
172+
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
173+
boolean failOnFlowControlLimits = false;
173174

174175
// Send bundle deadline
175-
Duration sendBundleDeadline;
176+
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
176177

177178
// RPC options
178-
Duration requestTimeout;
179+
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
179180

180181
// Channels and credentials
181-
Optional<Credentials> userCredentials;
182-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
182+
Optional<Credentials> userCredentials = Optional.absent();
183+
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
184+
Optional.absent();
183185

184-
Optional<ScheduledExecutorService> executor;
186+
Optional<ScheduledExecutorService> executor = Optional.absent();
185187

186188
/** Constructs a new {@link Builder} using the given topic. */
187189
public static Builder newBuilder(String topic) {
@@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) {
190192

191193
Builder(String topic) {
192194
this.topic = Preconditions.checkNotNull(topic);
193-
setDefaults();
194-
}
195-
196-
private void setDefaults() {
197-
userCredentials = Optional.absent();
198-
channelBuilder = Optional.absent();
199-
maxOutstandingMessages = Optional.absent();
200-
maxOutstandingBytes = Optional.absent();
201-
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
202-
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
203-
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
204-
failOnFlowControlLimits = false;
205-
executor = Optional.absent();
206195
}
207196

208197
/**
@@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
254243

255244
// Flow control options
256245

257-
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
258-
public Builder setMaxOutstandingMessages(int messages) {
259-
Preconditions.checkArgument(messages > 0);
260-
maxOutstandingMessages = Optional.of(messages);
261-
return this;
262-
}
263-
264-
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
265-
public Builder setMaxOutstandingBytes(int bytes) {
266-
Preconditions.checkArgument(bytes > 0);
267-
maxOutstandingBytes = Optional.of(bytes);
246+
/** Sets the flow control settings. */
247+
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
248+
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
268249
return this;
269250
}
270251

271252
/**
272253
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
273-
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as
274-
* appropriate.
254+
* RequestByteMaxOutstandingReachedException} or {@link
255+
* ElementCountMaxOutstandingReachedException} as appropriate.
275256
*
276257
* <p>If set to false, then publish operations will block the current thread until the
277258
* outstanding requests go under the limits.
@@ -306,51 +287,4 @@ public Publisher build() throws IOException {
306287
return new PublisherImpl(this);
307288
}
308289
}
309-
310-
/** Base exception that signals a flow control state. */
311-
abstract class CloudPubsubFlowControlException extends Exception {}
312-
313-
/**
314-
* Returned as a future exception when client-side flow control is enforced based on the maximum
315-
* number of outstanding in-memory messages.
316-
*/
317-
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
318-
private final int currentMaxMessages;
319-
320-
public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
321-
this.currentMaxMessages = currentMaxMessages;
322-
}
323-
324-
public int getCurrentMaxBundleMessages() {
325-
return currentMaxMessages;
326-
}
327-
328-
@Override
329-
public String toString() {
330-
return String.format(
331-
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
332-
}
333-
}
334-
335-
/**
336-
* Returned as a future exception when client-side flow control is enforced based on the maximum
337-
* number of unacknowledged in-memory bytes.
338-
*/
339-
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
340-
private final int currentMaxBytes;
341-
342-
public MaxOutstandingBytesReachedException(int currentMaxBytes) {
343-
this.currentMaxBytes = currentMaxBytes;
344-
}
345-
346-
public int getCurrentMaxBundleBytes() {
347-
return currentMaxBytes;
348-
}
349-
350-
@Override
351-
public String toString() {
352-
return String.format(
353-
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
354-
}
355-
}
356290
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.api.gax.bundling.FlowController;
1920
import com.google.auth.oauth2.GoogleCredentials;
2021
import com.google.common.base.Optional;
2122
import com.google.common.collect.ImmutableList;
@@ -68,8 +69,7 @@ final class PublisherImpl implements Publisher {
6869
private final Duration maxBundleDuration;
6970
private final boolean hasBundlingBytes;
7071

71-
private final Optional<Integer> maxOutstandingMessages;
72-
private final Optional<Integer> maxOutstandingBytes;
72+
private final FlowController.Settings flowControlSettings;
7373
private final boolean failOnFlowControlLimits;
7474

7575
private final Lock messagesBundleLock;
@@ -98,11 +98,9 @@ final class PublisherImpl implements Publisher {
9898
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
9999
hasBundlingBytes = maxBundleBytes > 0;
100100

101-
maxOutstandingMessages = builder.maxOutstandingMessages;
102-
maxOutstandingBytes = builder.maxOutstandingBytes;
101+
flowControlSettings = builder.flowControlSettings;
103102
failOnFlowControlLimits = builder.failOnFlowControlLimits;
104-
this.flowController =
105-
new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits);
103+
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);
106104

107105
sendBundleDeadline = builder.sendBundleDeadline;
108106

@@ -165,13 +163,13 @@ public long getMaxBundleMessages() {
165163
}
166164

167165
@Override
168-
public Optional<Integer> getMaxOutstandingMessages() {
169-
return maxOutstandingMessages;
166+
public Optional<Integer> getMaxOutstandingElementCount() {
167+
return flowControlSettings.getMaxOutstandingElementCount();
170168
}
171169

172170
@Override
173-
public Optional<Integer> getMaxOutstandingBytes() {
174-
return maxOutstandingBytes;
171+
public Optional<Integer> getMaxOutstandingRequestBytes() {
172+
return flowControlSettings.getMaxOutstandingRequestBytes();
175173
}
176174

177175
@Override
@@ -181,12 +179,12 @@ public boolean failOnFlowControlLimits() {
181179

182180
/** Whether flow control kicks in on a per outstanding messages basis. */
183181
boolean isPerMessageEnforced() {
184-
return maxOutstandingMessages.isPresent();
182+
return getMaxOutstandingElementCount().isPresent();
185183
}
186184

187185
/** Whether flow control kicks in on a per outstanding bytes basis. */
188186
boolean isPerBytesEnforced() {
189-
return maxOutstandingBytes.isPresent();
187+
return getMaxOutstandingRequestBytes().isPresent();
190188
}
191189

192190
@Override
@@ -203,7 +201,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {
203201
final int messageSize = message.getSerializedSize();
204202
try {
205203
flowController.reserve(1, messageSize);
206-
} catch (CloudPubsubFlowControlException e) {
204+
} catch (FlowController.FlowControlException e) {
207205
return Futures.immediateFailedFuture(e);
208206
}
209207
OutstandingBundle bundleToSend = null;

0 commit comments

Comments
 (0)