Skip to content

Commit cb1013d

Browse files
authored
---
yaml --- r: 32703 b: refs/heads/autosynth-iot c: 2fc4e55 h: refs/heads/master i: 32701: 63fdd7b 32699: 34b7560 32695: 5f5112e 32687: c981b4a 32671: 7b025c6 32639: d9d66f6
1 parent 3260a7d commit cb1013d

4 files changed

Lines changed: 11 additions & 24 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ refs/heads/autosynth-datastore: f1efc3dc465174f41041acd56cf29badcec3e5bd
131131
refs/heads/autosynth-dialogflow: 73974cc32e5212aec0126472e0bc1442886fedaf
132132
refs/heads/autosynth-errorreporting: effe8001d110ad584187b30aafc473db0dd4a15f
133133
refs/heads/autosynth-firestore: e79eeb26930dfae4439424ad2fda5874eeca54c8
134-
refs/heads/autosynth-iot: 78360de040eb4d86d1b25a7792269d2952174d66
134+
refs/heads/autosynth-iot: 2fc4e551b5263fe1e902efdbc1fd01acf8e3b5e9
135135
refs/heads/autosynth-kms: 6b65b0f34c12d141031c7288cdc01e550212d0f6
136136
refs/heads/autosynth-language: e73905aa7672afa47240e65b25c087207f4594f9
137137
refs/heads/autosynth-os-login: 123ba209c5769d0ee067e0ce5848bec13b42a4f4

branches/autosynth-iot/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection<String
104104
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
105105
}
106106

107-
public void addAckId(String ackId) {
108-
ackIds.add(ackId);
109-
}
110-
111107
@Override
112108
public String toString() {
113109
return String.format(
@@ -129,7 +125,7 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
129125
private final long receivedTimeMillis;
130126
private final Instant totalExpiration;
131127

132-
AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
128+
private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
133129
this.ackId = ackId;
134130
this.outstandingBytes = outstandingBytes;
135131
this.receivedTimeMillis = clock.millisTime();
@@ -182,7 +178,7 @@ public void onSuccess(AckReply reply) {
182178
}
183179
}
184180

185-
public interface AckProcessor {
181+
interface AckProcessor {
186182
void sendAckOperations(
187183
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions);
188184
}
@@ -211,7 +207,7 @@ void sendAckOperations(
211207
this.clock = clock;
212208
}
213209

214-
public void start() {
210+
void start() {
215211
final Runnable setExtendDeadline =
216212
new Runnable() {
217213
@Override
@@ -264,7 +260,7 @@ public void run() {
264260
}
265261
}
266262

267-
public void stop() {
263+
void stop() {
268264
messagesWaiter.waitNoMessages();
269265
jobLock.lock();
270266
try {
@@ -288,17 +284,17 @@ int getMessageDeadlineSeconds() {
288284
return messageDeadlineSeconds.get();
289285
}
290286

291-
static class OutstandingMessage {
287+
private static class OutstandingMessage {
292288
private final ReceivedMessage receivedMessage;
293289
private final AckHandler ackHandler;
294290

295-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
291+
private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
296292
this.receivedMessage = receivedMessage;
297293
this.ackHandler = ackHandler;
298294
}
299295
}
300296

301-
public void processReceivedMessages(List<ReceivedMessage> messages) {
297+
void processReceivedMessages(List<ReceivedMessage> messages) {
302298
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
303299
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
304300
for (ReceivedMessage message : messages) {

branches/autosynth-iot/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.core.InternalApi;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2120

2221
/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
2322
class MessageWaiter {
@@ -35,16 +34,10 @@ public synchronized void incrementPendingMessages(int messages) {
3534
}
3635

3736
public synchronized void waitNoMessages() {
38-
waitNoMessages(new AtomicBoolean());
39-
}
40-
41-
@InternalApi
42-
synchronized void waitNoMessages(AtomicBoolean waitReached) {
4337
boolean interrupted = false;
4438
try {
4539
while (pendingMessages > 0) {
4640
try {
47-
waitReached.set(true);
4841
wait();
4942
} catch (InterruptedException e) {
5043
// Ignored, uninterruptibly.

branches/autosynth-iot/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
2221
import org.junit.Test;
2322
import org.junit.runner.RunWith;
2423
import org.junit.runners.JUnit4;
@@ -32,22 +31,21 @@ public void test() throws Exception {
3231
final MessageWaiter waiter = new MessageWaiter();
3332
waiter.incrementPendingMessages(1);
3433

35-
final AtomicBoolean waitReached = new AtomicBoolean();
36-
34+
final Thread mainThread = Thread.currentThread();
3735
Thread t =
3836
new Thread(
3937
new Runnable() {
4038
@Override
4139
public void run() {
42-
while (!waitReached.get()) {
40+
while (mainThread.getState() != Thread.State.WAITING) {
4341
Thread.yield();
4442
}
4543
waiter.incrementPendingMessages(-1);
4644
}
4745
});
4846
t.start();
4947

50-
waiter.waitNoMessages(waitReached);
48+
waiter.waitNoMessages();
5149
t.join();
5250

5351
assertEquals(0, waiter.pendingMessages());

0 commit comments

Comments
 (0)