Skip to content

Commit 4b34b46

Browse files
authored
---
yaml --- r: 30677 b: refs/heads/autosynth-bigquerydatatransfer c: 2fc4e55 h: refs/heads/master i: 30675: 5f5875a
1 parent 5664c95 commit 4b34b46

4 files changed

Lines changed: 11 additions & 24 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123123
refs/heads/autosynth-asset: bdb45634a0fe8f7a510692b56b31f5312e25f453
124124
refs/heads/autosynth-automl: 22f9dd5b6f5df8dbfa7da0126864d565229519b2
125-
refs/heads/autosynth-bigquerydatatransfer: 78360de040eb4d86d1b25a7792269d2952174d66
125+
refs/heads/autosynth-bigquerydatatransfer: 2fc4e551b5263fe1e902efdbc1fd01acf8e3b5e9
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca

branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection<String
104104
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
105105
}
106106

107-
public void addAckId(String ackId) {
108-
ackIds.add(ackId);
109-
}
110-
111107
@Override
112108
public String toString() {
113109
return String.format(
@@ -129,7 +125,7 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
129125
private final long receivedTimeMillis;
130126
private final Instant totalExpiration;
131127

132-
AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
128+
private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
133129
this.ackId = ackId;
134130
this.outstandingBytes = outstandingBytes;
135131
this.receivedTimeMillis = clock.millisTime();
@@ -182,7 +178,7 @@ public void onSuccess(AckReply reply) {
182178
}
183179
}
184180

185-
public interface AckProcessor {
181+
interface AckProcessor {
186182
void sendAckOperations(
187183
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions);
188184
}
@@ -211,7 +207,7 @@ void sendAckOperations(
211207
this.clock = clock;
212208
}
213209

214-
public void start() {
210+
void start() {
215211
final Runnable setExtendDeadline =
216212
new Runnable() {
217213
@Override
@@ -264,7 +260,7 @@ public void run() {
264260
}
265261
}
266262

267-
public void stop() {
263+
void stop() {
268264
messagesWaiter.waitNoMessages();
269265
jobLock.lock();
270266
try {
@@ -288,17 +284,17 @@ int getMessageDeadlineSeconds() {
288284
return messageDeadlineSeconds.get();
289285
}
290286

291-
static class OutstandingMessage {
287+
private static class OutstandingMessage {
292288
private final ReceivedMessage receivedMessage;
293289
private final AckHandler ackHandler;
294290

295-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
291+
private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
296292
this.receivedMessage = receivedMessage;
297293
this.ackHandler = ackHandler;
298294
}
299295
}
300296

301-
public void processReceivedMessages(List<ReceivedMessage> messages) {
297+
void processReceivedMessages(List<ReceivedMessage> messages) {
302298
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
303299
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
304300
for (ReceivedMessage message : messages) {

branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.core.InternalApi;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2120

2221
/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
2322
class MessageWaiter {
@@ -35,16 +34,10 @@ public synchronized void incrementPendingMessages(int messages) {
3534
}
3635

3736
public synchronized void waitNoMessages() {
38-
waitNoMessages(new AtomicBoolean());
39-
}
40-
41-
@InternalApi
42-
synchronized void waitNoMessages(AtomicBoolean waitReached) {
4337
boolean interrupted = false;
4438
try {
4539
while (pendingMessages > 0) {
4640
try {
47-
waitReached.set(true);
4841
wait();
4942
} catch (InterruptedException e) {
5043
// Ignored, uninterruptibly.

branches/autosynth-bigquerydatatransfer/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
2221
import org.junit.Test;
2322
import org.junit.runner.RunWith;
2423
import org.junit.runners.JUnit4;
@@ -32,22 +31,21 @@ public void test() throws Exception {
3231
final MessageWaiter waiter = new MessageWaiter();
3332
waiter.incrementPendingMessages(1);
3433

35-
final AtomicBoolean waitReached = new AtomicBoolean();
36-
34+
final Thread mainThread = Thread.currentThread();
3735
Thread t =
3836
new Thread(
3937
new Runnable() {
4038
@Override
4139
public void run() {
42-
while (!waitReached.get()) {
40+
while (mainThread.getState() != Thread.State.WAITING) {
4341
Thread.yield();
4442
}
4543
waiter.incrementPendingMessages(-1);
4644
}
4745
});
4846
t.start();
4947

50-
waiter.waitNoMessages(waitReached);
48+
waiter.waitNoMessages();
5149
t.join();
5250

5351
assertEquals(0, waiter.pendingMessages());

0 commit comments

Comments
 (0)