Skip to content

Commit 3bdbf77

Browse files
committed
Merge branch 'master' into pubsub-ordering-keys
2 parents d98f981 + abf92a3 commit 3bdbf77

13 files changed

Lines changed: 47 additions & 171 deletions

File tree

google-cloud-clients/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,9 @@ private static Tuple<? extends Page<FieldValueList>, Long> listTableData(
655655
try {
656656
final TableId completeTableId =
657657
tableId.setProjectId(
658-
Strings.isNullOrEmpty(serviceOptions.getProjectId())
659-
? tableId.getProject()
660-
: serviceOptions.getProjectId());
658+
Strings.isNullOrEmpty(tableId.getProject())
659+
? serviceOptions.getProjectId()
660+
: tableId.getProject());
661661
TableDataList result =
662662
runWithRetries(
663663
new Callable<TableDataList>() {

google-cloud-clients/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,8 +1246,7 @@ public void testListTableDataFromTableIdWithProject() {
12461246
EasyMock.expect(bigqueryRpcMock.listTableData(OTHER_PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
12471247
.andReturn(TABLE_DATA_PB);
12481248
EasyMock.replay(bigqueryRpcMock);
1249-
BigQueryOptions bigQueryOptions =
1250-
createBigQueryOptionsForProject(OTHER_PROJECT, rpcFactoryMock);
1249+
BigQueryOptions bigQueryOptions = createBigQueryOptionsForProject(PROJECT, rpcFactoryMock);
12511250
bigquery = bigQueryOptions.getService();
12521251
Page<FieldValueList> page = bigquery.listTableData(tableId);
12531252
assertEquals(CURSOR, page.getNextPageToken());

google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStoragePath.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ boolean seemsLikeADirectoryAndUsePseudoDirectories(Storage storage) {
108108
this.bucket(),
109109
Storage.BlobListOption.prefix(prefix),
110110
// we only look at the first result, so no need for a bigger page.
111-
Storage.BlobListOption.pageSize(1));
111+
Storage.BlobListOption.pageSize(1),
112+
fileSystem.provider().getProject() == null
113+
? null
114+
: Storage.BlobListOption.userProject(fileSystem.provider().getProject()));
112115
for (Blob b : list.getValues()) {
113116
// if this blob starts with our prefix and then a slash, then prefix is indeed a folder!
114117
if (b.getBlobId() == null) {

google-cloud-clients/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/testing/FakeStorageRpc.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public Tuple<String, Iterable<StorageObject>> list(String bucket, Map<Option, ?>
145145
case MAX_RESULTS:
146146
maxResults = (Long) e.getValue();
147147
break;
148+
case USER_PROJECT:
149+
// prevent unsupported operation
150+
break;
148151
default:
149152
throw new UnsupportedOperationException("Unknown option: " + e.getKey());
150153
}

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@
4040
import com.google.cloud.pubsub.v1.stub.PublisherStub;
4141
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
4242
import com.google.common.base.Preconditions;
43-
import com.google.common.collect.ImmutableList;
4443
import com.google.pubsub.v1.PublishRequest;
4544
import com.google.pubsub.v1.PublishResponse;
4645
import com.google.pubsub.v1.PubsubMessage;
4746
import com.google.pubsub.v1.TopicName;
4847
import com.google.pubsub.v1.TopicNames;
4948
import java.io.IOException;
49+
import java.util.ArrayList;
5050
import java.util.Collections;
5151
import java.util.EnumSet;
5252
import java.util.HashMap;
@@ -227,7 +227,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
227227
}
228228

229229
message = messageTransform.apply(message);
230-
OutstandingBatch batchToSend = null;
230+
List<OutstandingBatch> batchesToSend = new ArrayList<>();
231231
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
232232
messagesBatchLock.lock();
233233
try {
@@ -241,19 +241,18 @@ public ApiFuture<String> publish(PubsubMessage message) {
241241
&& hasBatchingBytes()
242242
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
243243
>= getMaxBatchBytes()) {
244-
batchToSend = messagesBatch.popOutstandingBatch();
244+
batchesToSend.add(messagesBatch.popOutstandingBatch());
245245
}
246246

247-
// Border case if the message to send is greater or equals to the max batch size then can't
248-
// be included in the current batch and instead sent immediately.
249-
if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) {
250-
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
247+
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
251248

252-
// If after adding the message we have reached the batch max messages then we have a batch
253-
// to send.
254-
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
255-
batchToSend = messagesBatch.popOutstandingBatch();
256-
}
249+
// Border case: If the message to send is greater or equals to the max batch size then send it
250+
// immediately.
251+
// Alternatively if after adding the message we have reached the batch max messages then we
252+
// have a batch to send.
253+
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
254+
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
255+
batchesToSend.add(messagesBatch.popOutstandingBatch());
257256
}
258257

259258
// Setup the next duration based delivery alarm if there are messages batched.
@@ -264,21 +263,19 @@ && hasBatchingBytes()
264263

265264
messagesWaiter.incrementPendingMessages(1);
266265

267-
if (batchToSend != null) {
268-
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
269-
publishAllOutstanding();
270-
publishOutstandingBatch(batchToSend);
271-
}
272266

273-
// If the message is over the size limit, it was not added to the pending messages and it will
274-
// be sent in its own batch immediately.
275-
if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) {
276-
logger.log(
277-
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
267+
if (!batchesToSend.isEmpty()) {
278268
publishAllOutstanding();
279-
publishOutstandingBatch(
280-
new OutstandingBatch(
281-
ImmutableList.of(outstandingPublish), outstandingPublish.messageSize, orderingKey));
269+
for (final OutstandingBatch batch : batchesToSend) {
270+
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
271+
executor.execute(
272+
new Runnable() {
273+
@Override
274+
public void run() {
275+
publishOutstandingBatch(batch);
276+
}
277+
});
278+
}
282279
}
283280

284281
return outstandingPublish.publishResult;
@@ -416,11 +413,7 @@ private static final class OutstandingBatch {
416413
this.orderingKey = orderingKey;
417414
}
418415

419-
public int getAttempt() {
420-
return attempt;
421-
}
422-
423-
public int size() {
416+
int size() {
424417
return outstandingPublishes.size();
425418
}
426419
}

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private Subscriber(Builder builder) {
149149
closeables.add(
150150
new AutoCloseable() {
151151
@Override
152-
public void close() throws IOException {
152+
public void close() {
153153
alarmsExecutor.shutdown();
154154
}
155155
});

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static void setupClass() throws Exception {
8080
}
8181

8282
@AfterClass
83-
public static void tearDownClass() throws Exception {
83+
public static void tearDownClass() {
8484
topicAdminClient.close();
8585
subscriptionAdminClient.close();
8686
}
@@ -114,7 +114,7 @@ public void testTopicPolicy() {
114114
}
115115

116116
@Test
117-
public void testVPCPushSubscriber() throws Exception {
117+
public void testVPCPushSubscriber() {
118118
assumeTrue(IS_VPC_TEST);
119119
ProjectTopicName topicName =
120120
ProjectTopicName.of(projectId, formatForTest("testing-vpc-push-subscriber-topic"));

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -86,35 +86,6 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
8686
PendingCallableType.FIXED_DELAY));
8787
}
8888

89-
/**
90-
* This allows for adding expectations on future work to be scheduled ( {@link
91-
* FakeScheduledExecutorService#schedule} or {@link
92-
* FakeScheduledExecutorService#scheduleAtFixedRate} or {@link
93-
* FakeScheduledExecutorService#scheduleWithFixedDelay}) based on its delay.
94-
*/
95-
public void setupScheduleExpectation(Duration delay) {
96-
synchronized (expectedWorkQueue) {
97-
expectedWorkQueue.add(delay);
98-
}
99-
}
100-
101-
/**
102-
* Blocks the current thread until all the work {@link
103-
* FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in
104-
* the executor.
105-
*/
106-
public void waitForExpectedWork() {
107-
synchronized (expectedWorkQueue) {
108-
while (!expectedWorkQueue.isEmpty()) {
109-
try {
110-
expectedWorkQueue.wait();
111-
} catch (InterruptedException e) {
112-
// Wait uninterruptibly
113-
}
114-
}
115-
}
116-
}
117-
11889
/**
11990
* This will advance the reference time of the executor and execute (in the same thread) any
12091
* outstanding callable which execution time has passed.
@@ -232,7 +203,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
232203
return callable.getScheduledFuture();
233204
}
234205

235-
static enum PendingCallableType {
206+
enum PendingCallableType {
236207
NORMAL,
237208
FIXED_RATE,
238209
FIXED_DELAY
@@ -252,7 +223,7 @@ class PendingCallable<T> implements Comparable<PendingCallable<T>> {
252223
pendingCallable =
253224
new Callable<T>() {
254225
@Override
255-
public T call() throws Exception {
226+
public T call() {
256227
runnable.run();
257228
return null;
258229
}

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java

Lines changed: 1 addition & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.client.util.Preconditions;
20-
import com.google.api.core.InternalApi;
21-
import com.google.common.collect.ImmutableList;
2220
import com.google.protobuf.Empty;
2321
import com.google.pubsub.v1.AcknowledgeRequest;
2422
import com.google.pubsub.v1.GetSubscriptionRequest;
@@ -61,7 +59,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
6159
private final BlockingQueue<PullResponse> pullResponses = new LinkedBlockingDeque<>();
6260
private int currentStream;
6361

64-
public static enum CloseSide {
62+
public enum CloseSide {
6563
SERVER,
6664
CLIENT
6765
}
@@ -76,10 +74,6 @@ public ModifyAckDeadline(String ackId, long seconds) {
7674
this.seconds = seconds;
7775
}
7876

79-
public String getAckId() {
80-
return ackId;
81-
}
82-
8377
public long getSeconds() {
8478
return seconds;
8579
}
@@ -207,23 +201,6 @@ public StreamObserver<StreamingPullRequest> streamingPull(
207201
return stream.requestObserver;
208202
}
209203

210-
public void sendStreamingResponse(StreamingPullResponse pullResponse)
211-
throws InterruptedException {
212-
waitForRegistedSubscription();
213-
synchronized (openedStreams) {
214-
waitForOpenedStreams(1);
215-
openedStreams.get(getAndAdvanceCurrentStream()).responseObserver.onNext(pullResponse);
216-
}
217-
}
218-
219-
public void setMessageAckDeadlineSeconds(int ackDeadline) {
220-
messageAckDeadline.set(ackDeadline);
221-
}
222-
223-
public void enqueuePullResponse(PullResponse response) {
224-
pullResponses.add(response);
225-
}
226-
227204
@Override
228205
public void getSubscription(
229206
GetSubscriptionRequest request, StreamObserver<Subscription> responseObserver) {
@@ -237,12 +214,6 @@ public void getSubscription(
237214
responseObserver.onCompleted();
238215
}
239216

240-
/** Returns the number of times getSubscription is called. */
241-
@InternalApi
242-
int getSubscriptionCalledCount() {
243-
return getSubscriptionCalled.get();
244-
}
245-
246217
@Override
247218
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
248219
synchronized (receivedPullRequest) {
@@ -293,26 +264,6 @@ public String waitForRegistedSubscription() throws InterruptedException {
293264
return subscription;
294265
}
295266

296-
public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
297-
synchronized (acks) {
298-
waitAtLeast(acks, expectedCount);
299-
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
300-
acks.subList(0, expectedCount).clear();
301-
return receivedAcksCopy;
302-
}
303-
}
304-
305-
public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
306-
throws InterruptedException {
307-
synchronized (modAckDeadlines) {
308-
waitAtLeast(modAckDeadlines, expectedCount);
309-
List<ModifyAckDeadline> modAckDeadlinesCopy =
310-
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
311-
modAckDeadlines.subList(0, expectedCount).clear();
312-
return modAckDeadlinesCopy;
313-
}
314-
}
315-
316267
public int waitForClosedStreams(int expectedCount) throws InterruptedException {
317268
synchronized (closedStreams) {
318269
waitAtLeast(closedStreams, expectedCount);
@@ -341,50 +292,6 @@ private static void waitAtLeast(Collection<?> collection, int target)
341292
}
342293
}
343294

344-
public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
345-
synchronized (messageAckDeadline) {
346-
while (messageAckDeadline.get() != expectedValue) {
347-
messageAckDeadline.wait();
348-
}
349-
}
350-
}
351-
352-
public int getOpenedStreamsCount() {
353-
return openedStreams.size();
354-
}
355-
356-
public int getClosedStreamsCount() {
357-
return closedStreams.size();
358-
}
359-
360-
public List<String> getAcks() {
361-
return acks;
362-
}
363-
364-
public List<ModifyAckDeadline> getModifyAckDeadlines() {
365-
return modAckDeadlines;
366-
}
367-
368-
public void reset() {
369-
synchronized (subscriptionInitialized) {
370-
synchronized (openedStreams) {
371-
synchronized (acks) {
372-
synchronized (modAckDeadlines) {
373-
openedStreams.clear();
374-
closedStreams.clear();
375-
acks.clear();
376-
modAckDeadlines.clear();
377-
subscriptionInitialized.set(false);
378-
subscription = "";
379-
pullResponses.clear();
380-
receivedPullRequest.clear();
381-
currentStream = 0;
382-
}
383-
}
384-
}
385-
}
386-
}
387-
388295
private void addOpenedStream(Stream stream) {
389296
synchronized (openedStreams) {
390297
openedStreams.add(stream);

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void sendAckOperations(
123123
}
124124

125125
@Test
126-
public void testReceipt() throws Exception {
126+
public void testReceipt() {
127127
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
128128
dispatcher.processOutstandingAckOperations();
129129
assertThat(sentModAcks)
@@ -160,7 +160,7 @@ public void testExtension() throws Exception {
160160
}
161161

162162
@Test
163-
public void testExtension_Close() throws Exception {
163+
public void testExtension_Close() {
164164
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
165165
dispatcher.extendDeadlines();
166166
assertThat(sentModAcks)

0 commit comments

Comments
 (0)