1616
1717package com .google .cloud .pubsub ;
1818
19+ import com .google .api .gax .bundling .FlowController ;
1920import com .google .api .gax .grpc .BundlingSettings ;
2021import com .google .auth .Credentials ;
2122import com .google .auth .oauth2 .GoogleCredentials ;
@@ -86,8 +87,8 @@ public interface Publisher {
8687 int MAX_BUNDLE_BYTES = 10 * 1000 * 1000 ; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8788
8889 // Meaningful defaults.
89- int DEFAULT_MAX_BUNDLE_MESSAGES = 100 ;
90- int DEFAULT_MAX_BUNDLE_BYTES = 1000 ; // 1 kB
90+ long DEFAULT_MAX_BUNDLE_MESSAGES = 100L ;
91+ long DEFAULT_MAX_BUNDLE_BYTES = 1000L ; // 1 kB
9192 Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration (1 ); // 1ms
9293 Duration DEFAULT_REQUEST_TIMEOUT = new Duration (10 * 1000 ); // 10 seconds
9394 Duration MIN_SEND_BUNDLE_DURATION = new Duration (10 * 1000 ); // 10 seconds
@@ -129,21 +130,22 @@ public interface Publisher {
129130 * Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
130131 * See {@link #failOnFlowControlLimits()}.
131132 */
132- Optional <Integer > getMaxOutstandingMessages ();
133+ Optional <Integer > getMaxOutstandingElementCount ();
133134
134135 /**
135136 * Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
136137 * {@link #failOnFlowControlLimits()}.
137138 */
138- Optional <Integer > getMaxOutstandingBytes ();
139+ Optional <Integer > getMaxOutstandingRequestBytes ();
139140
140141 /**
141142 * Whether to block publish calls when reaching flow control limits (see {@link
142- * #getMaxOutstandingBytes ()} & {@link #getMaxOutstandingMessages ()}).
143+ * #getMaxOutstandingRequestBytes ()} & {@link #getMaxOutstandingElementCount ()}).
143144 *
144145 * <p>If set to false, a publish call will fail with either {@link
145- * MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
146- * appropriate, when flow control limits are reached.
146+ * RequestByteMaxOutstandingReachedException} or {@link
147+ * ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
148+ * reached.
147149 */
148150 boolean failOnFlowControlLimits ();
149151
@@ -164,24 +166,24 @@ final class Builder {
164166 String topic ;
165167
166168 // Bundling options
167- BundlingSettings bundlingSettings ;
169+ BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS ;
168170
169171 // Client-side flow control options
170- Optional <Integer > maxOutstandingMessages ;
171- Optional <Integer > maxOutstandingBytes ;
172- boolean failOnFlowControlLimits ;
172+ FlowController .Settings flowControlSettings = FlowController .Settings .DEFAULT ;
173+ boolean failOnFlowControlLimits = false ;
173174
174175 // Send bundle deadline
175- Duration sendBundleDeadline ;
176+ Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION ;
176177
177178 // RPC options
178- Duration requestTimeout ;
179+ Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT ;
179180
180181 // Channels and credentials
181- Optional <Credentials > userCredentials ;
182- Optional <ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>> channelBuilder ;
182+ Optional <Credentials > userCredentials = Optional .absent ();
183+ Optional <ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>> channelBuilder =
184+ Optional .absent ();
183185
184- Optional <ScheduledExecutorService > executor ;
186+ Optional <ScheduledExecutorService > executor = Optional . absent () ;
185187
186188 /** Constructs a new {@link Builder} using the given topic. */
187189 public static Builder newBuilder (String topic ) {
@@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) {
190192
191193 Builder (String topic ) {
192194 this .topic = Preconditions .checkNotNull (topic );
193- setDefaults ();
194- }
195-
196- private void setDefaults () {
197- userCredentials = Optional .absent ();
198- channelBuilder = Optional .absent ();
199- maxOutstandingMessages = Optional .absent ();
200- maxOutstandingBytes = Optional .absent ();
201- bundlingSettings = DEFAULT_BUNDLING_SETTINGS ;
202- requestTimeout = DEFAULT_REQUEST_TIMEOUT ;
203- sendBundleDeadline = MIN_SEND_BUNDLE_DURATION ;
204- failOnFlowControlLimits = false ;
205- executor = Optional .absent ();
206195 }
207196
208197 /**
@@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
254243
255244 // Flow control options
256245
257- /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
258- public Builder setMaxOutstandingMessages (int messages ) {
259- Preconditions .checkArgument (messages > 0 );
260- maxOutstandingMessages = Optional .of (messages );
261- return this ;
262- }
263-
264- /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
265- public Builder setMaxOutstandingBytes (int bytes ) {
266- Preconditions .checkArgument (bytes > 0 );
267- maxOutstandingBytes = Optional .of (bytes );
246+ /** Sets the flow control settings. */
247+ public Builder setFlowControlSettings (FlowController .Settings flowControlSettings ) {
248+ this .flowControlSettings = Preconditions .checkNotNull (flowControlSettings );
268249 return this ;
269250 }
270251
271252 /**
272253 * Whether to fail publish when reaching any of the flow control limits, with either a {@link
273- * MaxOutstandingBytesReachedException } or {@link MaxOutstandingMessagesReachedException} as
274- * appropriate.
254+ * RequestByteMaxOutstandingReachedException } or {@link
255+ * ElementCountMaxOutstandingReachedException} as appropriate.
275256 *
276257 * <p>If set to false, then publish operations will block the current thread until the
277258 * outstanding requests go under the limits.
@@ -306,51 +287,4 @@ public Publisher build() throws IOException {
306287 return new PublisherImpl (this );
307288 }
308289 }
309-
310- /** Base exception that signals a flow control state. */
311- abstract class CloudPubsubFlowControlException extends Exception {}
312-
313- /**
314- * Returned as a future exception when client-side flow control is enforced based on the maximum
315- * number of outstanding in-memory messages.
316- */
317- final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
318- private final int currentMaxMessages ;
319-
320- public MaxOutstandingMessagesReachedException (int currentMaxMessages ) {
321- this .currentMaxMessages = currentMaxMessages ;
322- }
323-
324- public int getCurrentMaxBundleMessages () {
325- return currentMaxMessages ;
326- }
327-
328- @ Override
329- public String toString () {
330- return String .format (
331- "The maximum number of bundle messages: %d have been reached." , currentMaxMessages );
332- }
333- }
334-
335- /**
336- * Returned as a future exception when client-side flow control is enforced based on the maximum
337- * number of unacknowledged in-memory bytes.
338- */
339- final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
340- private final int currentMaxBytes ;
341-
342- public MaxOutstandingBytesReachedException (int currentMaxBytes ) {
343- this .currentMaxBytes = currentMaxBytes ;
344- }
345-
346- public int getCurrentMaxBundleBytes () {
347- return currentMaxBytes ;
348- }
349-
350- @ Override
351- public String toString () {
352- return String .format (
353- "The maximum number of bundle bytes: %d have been reached." , currentMaxBytes );
354- }
355- }
356290}
0 commit comments