Skip to content

Commit e026c5c

Browse files
committed
Merge branch 'master' into pubsub-ordering-keys
2 parents ad54d69 + 2fc4e55 commit e026c5c

14 files changed

Lines changed: 424 additions & 286 deletions

File tree

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection<String
105105
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
106106
}
107107

108-
public void addAckId(String ackId) {
109-
ackIds.add(ackId);
110-
}
111-
112108
@Override
113109
public String toString() {
114110
return String.format(
@@ -130,7 +126,7 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
130126
private final long receivedTimeMillis;
131127
private final Instant totalExpiration;
132128

133-
AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
129+
private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
134130
this.ackId = ackId;
135131
this.outstandingBytes = outstandingBytes;
136132
this.receivedTimeMillis = clock.millisTime();
@@ -183,7 +179,7 @@ public void onSuccess(AckReply reply) {
183179
}
184180
}
185181

186-
public interface AckProcessor {
182+
interface AckProcessor {
187183
void sendAckOperations(
188184
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions);
189185
}
@@ -213,7 +209,7 @@ void sendAckOperations(
213209
this.sequentialExecutor = new SequentialExecutorService(executor);
214210
}
215211

216-
public void start() {
212+
void start() {
217213
final Runnable setExtendDeadline =
218214
new Runnable() {
219215
@Override
@@ -266,7 +262,7 @@ public void run() {
266262
}
267263
}
268264

269-
public void stop() {
265+
void stop() {
270266
messagesWaiter.waitNoMessages();
271267
jobLock.lock();
272268
try {
@@ -290,17 +286,17 @@ int getMessageDeadlineSeconds() {
290286
return messageDeadlineSeconds.get();
291287
}
292288

293-
static class OutstandingMessage {
289+
private static class OutstandingMessage {
294290
private final ReceivedMessage receivedMessage;
295291
private final AckHandler ackHandler;
296292

297-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
293+
private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
298294
this.receivedMessage = receivedMessage;
299295
this.ackHandler = ackHandler;
300296
}
301297
}
302298

303-
public void processReceivedMessages(List<ReceivedMessage> messages) {
299+
void processReceivedMessages(List<ReceivedMessage> messages) {
304300
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
305301
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
306302
for (ReceivedMessage message : messages) {

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.core.InternalApi;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2120

2221
/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
2322
class MessageWaiter {
@@ -35,16 +34,10 @@ public synchronized void incrementPendingMessages(int messages) {
3534
}
3635

3736
public synchronized void waitNoMessages() {
38-
waitNoMessages(new AtomicBoolean());
39-
}
40-
41-
@InternalApi
42-
synchronized void waitNoMessages(AtomicBoolean waitReached) {
4337
boolean interrupted = false;
4438
try {
4539
while (pendingMessages > 0) {
4640
try {
47-
waitReached.set(true);
4841
wait();
4942
} catch (InterruptedException e) {
5043
// Ignored, uninterruptibly.

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -401,31 +401,31 @@ private void stopConnections(List<? extends ApiService> connections) {
401401
public static final class Builder {
402402
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
403403

404-
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
404+
private static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
405405
InstantiatingExecutorProvider.newBuilder()
406406
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407407
.build();
408408

409-
String subscriptionName;
410-
MessageReceiver receiver;
409+
private String subscriptionName;
410+
private MessageReceiver receiver;
411411

412-
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
412+
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
413413

414-
FlowControlSettings flowControlSettings =
414+
private FlowControlSettings flowControlSettings =
415415
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build();
416416

417-
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
418-
ExecutorProvider systemExecutorProvider = null;
419-
TransportChannelProvider channelProvider =
417+
private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
418+
private ExecutorProvider systemExecutorProvider = null;
419+
private TransportChannelProvider channelProvider =
420420
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
421421
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
422422
.setKeepAliveTime(Duration.ofMinutes(5))
423423
.build();
424-
HeaderProvider headerProvider = new NoHeaderProvider();
425-
CredentialsProvider credentialsProvider =
424+
private HeaderProvider headerProvider = new NoHeaderProvider();
425+
private CredentialsProvider credentialsProvider =
426426
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
427-
Optional<ApiClock> clock = Optional.absent();
428-
int parallelPullCount = 1;
427+
private Optional<ApiClock> clock = Optional.absent();
428+
private int parallelPullCount = 1;
429429

430430
Builder(String subscriptionName, MessageReceiver receiver) {
431431
this.subscriptionName = subscriptionName;

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
2221
import org.junit.Test;
2322
import org.junit.runner.RunWith;
2423
import org.junit.runners.JUnit4;
@@ -32,22 +31,21 @@ public void test() throws Exception {
3231
final MessageWaiter waiter = new MessageWaiter();
3332
waiter.incrementPendingMessages(1);
3433

35-
final AtomicBoolean waitReached = new AtomicBoolean();
36-
34+
final Thread mainThread = Thread.currentThread();
3735
Thread t =
3836
new Thread(
3937
new Runnable() {
4038
@Override
4139
public void run() {
42-
while (!waitReached.get()) {
40+
while (mainThread.getState() != Thread.State.WAITING) {
4341
Thread.yield();
4442
}
4543
waiter.incrementPendingMessages(-1);
4644
}
4745
});
4846
t.start();
4947

50-
waiter.waitNoMessages(waitReached);
48+
waiter.waitNoMessages();
5149
t.join();
5250

5351
assertEquals(0, waiter.pendingMessages());

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
2929
import com.google.cloud.spanner.Options.QueryOption;
3030
import com.google.cloud.spanner.Options.ReadOption;
31-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
32-
import com.google.cloud.spanner.SpannerImpl.SessionTransaction;
31+
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
3332
import com.google.cloud.spanner.spi.v1.SpannerRpc;
3433
import com.google.protobuf.ByteString;
3534
import com.google.spanner.v1.BeginTransactionRequest;
@@ -205,7 +204,7 @@ ByteString getTransactionId() {
205204
}
206205

207206
void initTransaction() {
208-
SpannerImpl.throwIfTransactionsPending();
207+
SessionImpl.throwIfTransactionsPending();
209208

210209
// Since we only support synchronous calls, just block on "txnLock" while the RPC is in
211210
// flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC,

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
24-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
2524
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2625
import com.google.common.base.Preconditions;
2726
import com.google.common.collect.ImmutableList;

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkState;
2020

21-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
22-
import com.google.cloud.spanner.SpannerImpl.SessionTransaction;
21+
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2322
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2423
import com.google.protobuf.ByteString;
2524
import com.google.spanner.v1.BeginTransactionRequest;

0 commit comments

Comments
 (0)