Skip to content

Commit d284c1f

Browse files
committed
Addressing next round of static code analisys comments
1 parent d853865 commit d284c1f

12 files changed

Lines changed: 190 additions & 205 deletions

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void reserve(int messages, int bytes) throws CloudPubsubFlowControlException {
5959
// Will always allow to send a message even if it is larger than the flow control limit,
6060
// if it doesn't then it will deadlock the thread.
6161
if (outstandingByteCount != null) {
62-
int permitsToDraw = Integer.min(maxOutstandingBytes.get(), bytes);
62+
int permitsToDraw = maxOutstandingBytes.get() > bytes ? bytes : maxOutstandingBytes.get();
6363
if (!failOnLimits) {
6464
outstandingByteCount.acquireUninterruptibly(permitsToDraw);
6565
} else if (!outstandingByteCount.tryAcquire(permitsToDraw)) {
@@ -76,7 +76,7 @@ void release(int messages, int bytes) {
7676
}
7777
if (outstandingByteCount != null) {
7878
// Need to return at most as much bytes as it can be drawn.
79-
int permitsToReturn = Integer.min(maxOutstandingBytes.get(), bytes);
79+
int permitsToReturn = maxOutstandingBytes.get() > bytes ? bytes : maxOutstandingBytes.get();
8080
outstandingByteCount.release(permitsToReturn);
8181
}
8282
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@
3535
* messages, controlling memory utilization, and retrying API calls on transient errors.
3636
*
3737
* <p>With customizable options that control:
38+
*
3839
* <ul>
39-
* <li>Message batching: such as number of messages or max batch byte size.
40-
* <li>Flow control: such as max outstanding messages and maximum outstanding bytes.
41-
* <li>Retries: such as the maximum duration of retries for a failing batch of messages.
40+
* <li>Message batching: such as number of messages or max batch byte size.
41+
* <li>Flow control: such as max outstanding messages and maximum outstanding bytes.
42+
* <li>Retries: such as the maximum duration of retries for a failing batch of messages.
4243
* </ul>
4344
*
44-
* <p>If no credentials are provided, the {@link Publisher} will use application default
45-
* credentials through {@link GoogleCredentials#getApplicationDefault}.
45+
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
46+
* through {@link GoogleCredentials#getApplicationDefault}.
4647
*
4748
* <p>For example, a {@link Publisher} can be constructed and used to publish a list of messages as
4849
* follows:
@@ -76,21 +77,20 @@
7677
* </pre>
7778
*/
7879
public interface Publisher {
79-
static final String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
80-
static final String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
80+
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
81+
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
8182

8283
// API limits.
83-
static final int MAX_BATCH_MESSAGES = 1000;
84-
static final int MAX_BATCH_BYTES =
85-
10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
84+
int MAX_BATCH_MESSAGES = 1000;
85+
int MAX_BATCH_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8686

8787
// Meaningful defaults.
88-
static final int DEFAULT_MAX_BATCH_MESSAGES = 100;
89-
static final int DEFAULT_MAX_BATCH_BYTES = 1000; // 1 kB
90-
static final Duration DEFAULT_MAX_BATCH_DURATION = new Duration(1); // 1ms
91-
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
92-
static final Duration MIN_SEND_BATCH_DURATION = new Duration(10 * 1000); // 10 seconds
93-
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
88+
int DEFAULT_MAX_BATCH_MESSAGES = 100;
89+
int DEFAULT_MAX_BATCH_BYTES = 1000; // 1 kB
90+
Duration DEFAULT_MAX_BATCH_DURATION = new Duration(1); // 1ms
91+
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
92+
Duration MIN_SEND_BATCH_DURATION = new Duration(10 * 1000); // 10 seconds
93+
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
9494

9595
/** Topic to which the publisher publishes to. */
9696
String getTopic();
@@ -118,17 +118,17 @@ public interface Publisher {
118118
long getMaxBatchMessages();
119119

120120
/**
121-
* Maximum number of outstanding (i.e. pending to publish) messages before limits are
122-
* enforced. See {@link #failOnFlowControlLimits()}.
121+
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
122+
* See {@link #failOnFlowControlLimits()}.
123123
*/
124124
Optional<Integer> getMaxOutstandingMessages();
125125

126126
/**
127-
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced.
128-
* See {@link #failOnFlowControlLimits()}.
129-
*/
127+
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
128+
* {@link #failOnFlowControlLimits()}.
129+
*/
130130
Optional<Integer> getMaxOutstandingBytes();
131-
131+
132132
/**
133133
* Whether to block publish calls when reaching flow control limits (see {@link
134134
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
@@ -138,7 +138,6 @@ public interface Publisher {
138138
* appropriate, when flow control limits are reached.
139139
*/
140140
boolean failOnFlowControlLimits();
141-
142141

143142
/** Retrieves a snapshot of the publisher current {@link PublisherStats statistics}. */
144143
PublisherStats getStats();
@@ -178,9 +177,7 @@ final class Builder {
178177

179178
Optional<ScheduledExecutorService> executor;
180179

181-
/**
182-
* Constructs a new {@link Builder} using the given topic.
183-
*/
180+
/** Constructs a new {@link Builder} using the given topic. */
184181
public static Builder newBuilder(String topic) {
185182
return new Builder(topic);
186183
}
@@ -265,18 +262,14 @@ public Builder setMaxBatchDuration(Duration duration) {
265262

266263
// Flow control options
267264

268-
/**
269-
* Maximum number of outstanding messages to keep in memory before enforcing flow control.
270-
*/
265+
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
271266
public Builder setMaxOutstandingMessages(int messages) {
272267
Preconditions.checkArgument(messages > 0);
273268
maxOutstandingMessages = Optional.of(messages);
274269
return this;
275270
}
276271

277-
/**
278-
* Maximum number of outstanding messages to keep in memory before enforcing flow control.
279-
*/
272+
/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
280273
public Builder setMaxOutstandingBytes(int bytes) {
281274
Preconditions.checkArgument(bytes > 0);
282275
maxOutstandingBytes = Optional.of(bytes);
@@ -296,9 +289,7 @@ public Builder setFailOnFlowControlLimits(boolean fail) {
296289
return this;
297290
}
298291

299-
/**
300-
* Maximum time to attempt sending (and retrying) a batch of messages before giving up.
301-
*/
292+
/** Maximum time to attempt sending (and retrying) a batch of messages before giving up. */
302293
public Builder setSendBatchDeadline(Duration deadline) {
303294
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BATCH_DURATION) >= 0);
304295
sendBatchDeadline = deadline;
@@ -326,13 +317,12 @@ public Publisher build() throws IOException {
326317

327318
/** Base exception that signals a flow control state. */
328319
abstract class CloudPubsubFlowControlException extends Exception {}
329-
320+
330321
/**
331322
* Returned as a future exception when client-side flow control is enforced based on the maximum
332323
* number of outstanding in-memory messages.
333324
*/
334-
final class MaxOutstandingMessagesReachedException
335-
extends CloudPubsubFlowControlException {
325+
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
336326
private final int currentMaxMessages;
337327

338328
public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
@@ -354,8 +344,7 @@ public String toString() {
354344
* Returned as a future exception when client-side flow control is enforced based on the maximum
355345
* number of unacknowledged in-memory bytes.
356346
*/
357-
final class MaxOutstandingBytesReachedException
358-
extends CloudPubsubFlowControlException {
347+
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
359348
private final int currentMaxBytes;
360349

361350
public MaxOutstandingBytesReachedException(int currentMaxBytes) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,14 @@ public ListenableFuture<String> publish(PubsubMessage message) {
200200
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
201201
}
202202

203-
SettableFuture<String> publishResult = SettableFuture.create();
204203
final int messageSize = message.getSerializedSize();
205204
try {
206205
flowController.reserve(1, messageSize);
207206
} catch (CloudPubsubFlowControlException e) {
208207
return Futures.immediateFailedFuture(e);
209208
}
210209
OutstandingBatch batchToSend = null;
210+
SettableFuture<String> publishResult = SettableFuture.create();
211211
final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message);
212212
messagesBatchLock.lock();
213213
try {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.util.concurrent.Service;
2626
import com.google.pubsub.v1.PubsubMessage;
2727
import io.grpc.ManagedChannelBuilder;
28+
import java.io.IOException;
2829
import java.util.concurrent.ScheduledExecutorService;
2930
import org.joda.time.Duration;
3031

@@ -85,8 +86,8 @@ public void failed(State from, Throwable failure) {
8586
* </pre>
8687
*/
8788
public interface Subscriber extends Service {
88-
static final String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
89-
static final String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
89+
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
90+
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
9091

9192
/** Retrieves a snapshot of the current subscriber statistics. */
9293
SubscriberStats getStats();
@@ -137,8 +138,8 @@ public static enum AckReply {
137138

138139
/** Builder of {@link Subscriber Subscribers}. */
139140
final class Builder {
140-
static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
141-
static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
141+
Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
142+
Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
142143

143144
String subscription;
144145
Optional<Credentials> credentials;
@@ -258,7 +259,7 @@ public Builder setExecutor(ScheduledExecutorService executor) {
258259
return this;
259260
}
260261

261-
public Subscriber build() {
262+
public Subscriber build() throws IOException {
262263
return new SubscriberImpl(this);
263264
}
264265
}

0 commit comments

Comments
 (0)