Skip to content

Commit d07d322

Browse files
Pub/Sub: Converting Future -> ApiFuture
Also, deleting PullFuture which is subsumed by ApiFuture.
1 parent be2c36f commit d07d322

20 files changed

Lines changed: 545 additions & 703 deletions

File tree

google-cloud-core/src/main/java/com/google/cloud/AsyncPage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud;
1818

19-
import java.util.concurrent.Future;
19+
import com.google.api.gax.core.ApiFuture;
2020

2121
/**
2222
* Interface for asynchronously consuming Google Cloud paginated results.
@@ -46,8 +46,8 @@ public interface AsyncPage<T> extends Page<T> {
4646

4747

4848
/**
49-
* Returns a {@link Future} object for the next page. {@link Future#get()} returns {@code null} if
50-
* the last page has been reached.
49+
* Returns a {@link ApiFuture} object for the next page. {@link ApiFuture#get()} returns
50+
* {@code null} if the last page has been reached.
5151
*/
52-
Future<AsyncPage<T>> getNextPageAsync();
52+
ApiFuture<AsyncPage<T>> getNextPageAsync();
5353
}

google-cloud-core/src/main/java/com/google/cloud/AsyncPageImpl.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
package com.google.cloud;
1818

19+
import com.google.api.gax.core.ApiFuture;
20+
import com.google.api.gax.core.ApiFutures;
1921
import com.google.common.base.Throwables;
20-
import com.google.common.util.concurrent.Futures;
2122
import com.google.common.util.concurrent.Uninterruptibles;
22-
2323
import java.io.Serializable;
2424
import java.util.concurrent.ExecutionException;
25-
import java.util.concurrent.Future;
2625

2726
/**
2827
* Base implementation for asynchronously consuming Google Cloud paginated results.
@@ -42,7 +41,7 @@ public class AsyncPageImpl<T> extends PageImpl<T> implements AsyncPage<T> {
4241
*/
4342
public interface NextPageFetcher<T> extends Serializable {
4443

45-
Future<AsyncPage<T>> getNextPage();
44+
ApiFuture<AsyncPage<T>> getNextPage();
4645
}
4746

4847
private static class SyncNextPageFetcher<T> implements PageImpl.NextPageFetcher<T> {
@@ -76,9 +75,9 @@ public AsyncPageImpl(NextPageFetcher<T> asyncPageFetcher, String cursor, Iterabl
7675

7776

7877
@Override
79-
public Future<AsyncPage<T>> getNextPageAsync() {
78+
public ApiFuture<AsyncPage<T>> getNextPageAsync() {
8079
if (getNextPageCursor() == null || asyncPageFetcher == null) {
81-
return Futures.immediateCheckedFuture(null);
80+
return ApiFutures.immediateFuture(null);
8281
}
8382
return asyncPageFetcher.getNextPage();
8483
}

google-cloud-core/src/test/java/com/google/cloud/AsyncPageImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020

21+
import com.google.api.gax.core.ApiFuture;
22+
import com.google.api.gax.core.ApiFutures;
2123
import com.google.common.collect.ImmutableList;
22-
import com.google.common.util.concurrent.Futures;
2324

2425
import org.junit.Test;
2526

2627
import java.util.concurrent.ExecutionException;
27-
import java.util.concurrent.Future;
2828

2929
public class AsyncPageImplTest {
3030

@@ -51,8 +51,8 @@ private static class TestPageFetcher implements AsyncPageImpl.NextPageFetcher<St
5151
}
5252

5353
@Override
54-
public Future<AsyncPage<String>> getNextPage() {
55-
return Futures.<AsyncPage<String>>immediateFuture(nextResult);
54+
public ApiFuture<AsyncPage<String>> getNextPage() {
55+
return ApiFutures.<AsyncPage<String>>immediateFuture(nextResult);
5656
}
5757
}
5858

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/deprecated/MessageConsumerImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020

21+
import com.google.api.gax.core.ApiFuture;
22+
import com.google.api.gax.core.ApiFutureCallback;
23+
import com.google.api.gax.core.ApiFutures;
2124
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2225
import com.google.cloud.pubsub.deprecated.PubSub.MessageConsumer;
2326
import com.google.cloud.pubsub.deprecated.PubSub.MessageProcessor;
2427
import com.google.cloud.pubsub.deprecated.spi.PubSubRpc;
25-
import com.google.cloud.pubsub.deprecated.spi.PubSubRpc.PullCallback;
26-
import com.google.cloud.pubsub.deprecated.spi.PubSubRpc.PullFuture;
2728
import com.google.pubsub.v1.PullRequest;
2829
import com.google.pubsub.v1.PullResponse;
2930
import com.google.pubsub.v1.SubscriptionName;
@@ -71,7 +72,7 @@ public void close(ExecutorService instance) {
7172
private final NextPullPolicy pullPolicy;
7273
private boolean closed;
7374
private Future<?> scheduledFuture;
74-
private PullFuture pullerFuture;
75+
private ApiFuture<PullResponse> pullerFuture;
7576

7677
/**
7778
* Interface for policies according to which the consumer should pull messages.
@@ -128,9 +129,9 @@ public void run() {
128129
return;
129130
}
130131
pullerFuture = pubsubRpc.pull(createPullRequest());
131-
pullerFuture.addCallback(new PullCallback() {
132+
ApiFutures.addCallback(pullerFuture, new ApiFutureCallback<PullResponse>() {
132133
@Override
133-
public void success(PullResponse response) {
134+
public void onSuccess(PullResponse response) {
134135
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
135136
queuedCallbacks.addAndGet(messages.size());
136137
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
@@ -142,7 +143,7 @@ public void success(PullResponse response) {
142143
}
143144

144145
@Override
145-
public void failure(Throwable error) {
146+
public void onFailure(Throwable error) {
146147
if (!(error instanceof CancellationException)) {
147148
nextPull();
148149
}

0 commit comments

Comments
 (0)