Skip to content

Commit 8ca6cb3

Browse files
authored
---
yaml --- r: 7775 b: refs/heads/tswast-patch-1 c: db4908e h: refs/heads/master i: 7773: 49b1f1e 7771: abdfb95 7767: 88790a8 7759: 696c33f 7743: 36436aa
1 parent 3f3212b commit 8ca6cb3

14 files changed

Lines changed: 167 additions & 532 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: 6a4bc9d496d0cbd26f7b9a7d259bb0990c7a3e83
60+
refs/heads/tswast-patch-1: db4908e82af4561127cc651f76978e8703d9ea64
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/google-cloud-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<dependency>
112112
<groupId>com.google.api</groupId>
113113
<artifactId>gax</artifactId>
114-
<version>0.0.25</version>
114+
<version>0.0.27</version>
115115
<exclusions>
116116
<exclusion>
117117
<groupId>io.grpc</groupId>

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java

Lines changed: 0 additions & 83 deletions
This file was deleted.

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616

1717
package com.google.cloud.pubsub;
1818

19-
import com.google.auth.Credentials;
19+
import com.google.api.gax.bundling.FlowController;
2020
import com.google.cloud.Clock;
21-
import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
2221
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2322
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2423
import com.google.common.annotations.VisibleForTesting;
2524
import com.google.common.collect.Lists;
2625
import com.google.common.primitives.Ints;
27-
import com.google.common.util.concurrent.AbstractService;
2826
import com.google.common.util.concurrent.FutureCallback;
2927
import com.google.common.util.concurrent.Futures;
3028
import com.google.pubsub.v1.PubsubMessage;
3129
import com.google.pubsub.v1.ReceivedMessage;
32-
import io.grpc.Status;
3330
import java.util.ArrayList;
3431
import java.util.Collection;
3532
import java.util.HashMap;
@@ -287,7 +284,7 @@ public void run() {
287284
}
288285
try {
289286
flowController.reserve(receivedMessagesCount, totalByteCount);
290-
} catch (CloudPubsubFlowControlException unexpectedException) {
287+
} catch (FlowController.FlowControlException unexpectedException) {
291288
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
292289
}
293290
}

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.pubsub.StatusUtil.isRetryable;
2020

21+
import com.google.api.gax.bundling.FlowController;
2122
import com.google.auth.Credentials;
2223
import com.google.cloud.Clock;
2324
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
@@ -39,7 +40,6 @@
3940
import io.grpc.Channel;
4041
import io.grpc.StatusRuntimeException;
4142
import io.grpc.auth.MoreCallCredentials;
42-
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.concurrent.ScheduledExecutorService;
4545
import java.util.concurrent.TimeUnit;

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 23 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.api.gax.bundling.FlowController;
1920
import com.google.api.gax.grpc.BundlingSettings;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.GoogleCredentials;
@@ -86,8 +87,8 @@ public interface Publisher {
8687
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8788

8889
// Meaningful defaults.
89-
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
90-
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
90+
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
91+
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
9192
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
9293
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
9394
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
@@ -129,21 +130,22 @@ public interface Publisher {
129130
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
130131
* See {@link #failOnFlowControlLimits()}.
131132
*/
132-
Optional<Integer> getMaxOutstandingMessages();
133+
Optional<Integer> getMaxOutstandingElementCount();
133134

134135
/**
135136
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
136137
* {@link #failOnFlowControlLimits()}.
137138
*/
138-
Optional<Integer> getMaxOutstandingBytes();
139+
Optional<Integer> getMaxOutstandingRequestBytes();
139140

140141
/**
141142
* Whether to block publish calls when reaching flow control limits (see {@link
142-
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
143+
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
143144
*
144145
* <p>If set to false, a publish call will fail with either {@link
145-
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
146-
* appropriate, when flow control limits are reached.
146+
* RequestByteMaxOutstandingReachedException} or {@link
147+
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
148+
* reached.
147149
*/
148150
boolean failOnFlowControlLimits();
149151

@@ -164,24 +166,24 @@ final class Builder {
164166
String topic;
165167

166168
// Bundling options
167-
BundlingSettings bundlingSettings;
169+
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
168170

169171
// Client-side flow control options
170-
Optional<Integer> maxOutstandingMessages;
171-
Optional<Integer> maxOutstandingBytes;
172-
boolean failOnFlowControlLimits;
172+
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
173+
boolean failOnFlowControlLimits = false;
173174

174175
// Send bundle deadline
175-
Duration sendBundleDeadline;
176+
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
176177

177178
// RPC options
178-
Duration requestTimeout;
179+
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
179180

180181
// Channels and credentials
181-
Optional<Credentials> userCredentials;
182-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
182+
Optional<Credentials> userCredentials = Optional.absent();
183+
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
184+
Optional.absent();
183185

184-
Optional<ScheduledExecutorService> executor;
186+
Optional<ScheduledExecutorService> executor = Optional.absent();
185187

186188
/** Constructs a new {@link Builder} using the given topic. */
187189
public static Builder newBuilder(String topic) {
@@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) {
190192

191193
Builder(String topic) {
192194
this.topic = Preconditions.checkNotNull(topic);
193-
setDefaults();
194-
}
195-
196-
private void setDefaults() {
197-
userCredentials = Optional.absent();
198-
channelBuilder = Optional.absent();
199-
maxOutstandingMessages = Optional.absent();
200-
maxOutstandingBytes = Optional.absent();
201-
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
202-
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
203-
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
204-
failOnFlowControlLimits = false;
205-
executor = Optional.absent();
206195
}
207196

208197
/**
@@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
254243

255244
// Flow control options
256245

257-
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
258-
public Builder setMaxOutstandingMessages(int messages) {
259-
Preconditions.checkArgument(messages > 0);
260-
maxOutstandingMessages = Optional.of(messages);
261-
return this;
262-
}
263-
264-
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
265-
public Builder setMaxOutstandingBytes(int bytes) {
266-
Preconditions.checkArgument(bytes > 0);
267-
maxOutstandingBytes = Optional.of(bytes);
246+
/** Sets the flow control settings. */
247+
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
248+
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
268249
return this;
269250
}
270251

271252
/**
272253
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
273-
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as
274-
* appropriate.
254+
* RequestByteMaxOutstandingReachedException} or {@link
255+
* ElementCountMaxOutstandingReachedException} as appropriate.
275256
*
276257
* <p>If set to false, then publish operations will block the current thread until the
277258
* outstanding requests go under the limits.
@@ -306,51 +287,4 @@ public Publisher build() throws IOException {
306287
return new PublisherImpl(this);
307288
}
308289
}
309-
310-
/** Base exception that signals a flow control state. */
311-
abstract class CloudPubsubFlowControlException extends Exception {}
312-
313-
/**
314-
* Returned as a future exception when client-side flow control is enforced based on the maximum
315-
* number of outstanding in-memory messages.
316-
*/
317-
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
318-
private final int currentMaxMessages;
319-
320-
public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
321-
this.currentMaxMessages = currentMaxMessages;
322-
}
323-
324-
public int getCurrentMaxBundleMessages() {
325-
return currentMaxMessages;
326-
}
327-
328-
@Override
329-
public String toString() {
330-
return String.format(
331-
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
332-
}
333-
}
334-
335-
/**
336-
* Returned as a future exception when client-side flow control is enforced based on the maximum
337-
* number of unacknowledged in-memory bytes.
338-
*/
339-
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
340-
private final int currentMaxBytes;
341-
342-
public MaxOutstandingBytesReachedException(int currentMaxBytes) {
343-
this.currentMaxBytes = currentMaxBytes;
344-
}
345-
346-
public int getCurrentMaxBundleBytes() {
347-
return currentMaxBytes;
348-
}
349-
350-
@Override
351-
public String toString() {
352-
return String.format(
353-
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
354-
}
355-
}
356290
}

0 commit comments

Comments
 (0)