Skip to content

Commit 8ca5bb2

Browse files
committed
move MessageReceiver to reduce nesting
1 parent 5f1417c commit 8ca5bb2

6 files changed

Lines changed: 54 additions & 31 deletions

File tree

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import com.google.api.gax.bundling.FlowController;
20+
import com.google.auth.Credentials;
21+
import com.google.auth.oauth2.GoogleCredentials;
22+
import com.google.cloud.Clock;
23+
import com.google.common.base.Optional;
24+
import com.google.common.base.Preconditions;
25+
import com.google.common.util.concurrent.ListenableFuture;
26+
import com.google.common.util.concurrent.Service;
27+
import com.google.pubsub.v1.PubsubMessage;
28+
import io.grpc.ManagedChannelBuilder;
29+
import java.io.IOException;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import org.joda.time.Duration;
32+
33+
/** Users of the {@link Subscriber} must implement this interface to receive messages. */
34+
interface MessageReceiver {
35+
enum AckReply {
36+
/** To be used for acking a message. */
37+
ACK,
38+
/** To be used for nacking a message. */
39+
NACK
40+
}
41+
/**
42+
* Called when a message is received by the subscriber.
43+
*
44+
* @return A future that signals when a message has been processed.
45+
*/
46+
ListenableFuture<AckReply> receiveMessage(PubsubMessage message);
47+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.api.stats.Distribution;
2121
import com.google.cloud.Clock;
22-
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
23-
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
22+
import com.google.cloud.pubsub.MessageReceiver.AckReply;
2423
import com.google.common.annotations.VisibleForTesting;
2524
import com.google.common.collect.Lists;
2625
import com.google.common.primitives.Ints;
@@ -140,7 +139,10 @@ public void addAckId(String ackId) {
140139
}
141140
}
142141

143-
/** Handles callbacks for acking/nacking messages from the {@link MessageReceiver}. */
142+
/**
143+
* Handles callbacks for acking/nacking messages from the {@link
144+
* com.google.cloud.pubsub.MessageReceiver}.
145+
*/
144146
private class AckHandler implements FutureCallback<AckReply> {
145147
private final String ackId;
146148
private final int outstandingBytes;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.Clock;
2525
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
2626
import com.google.cloud.pubsub.MessagesProcessor.PendingModifyAckDeadline;
27-
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2827
import com.google.common.collect.Lists;
2928
import com.google.common.util.concurrent.AbstractService;
3029
import com.google.common.util.concurrent.FutureCallback;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.Clock;
2525
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
2626
import com.google.cloud.pubsub.MessagesProcessor.PendingModifyAckDeadline;
27-
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2827
import com.google.common.collect.Lists;
2928
import com.google.common.util.concurrent.AbstractService;
3029
import com.google.common.util.concurrent.FutureCallback;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020
import com.google.auth.Credentials;
2121
import com.google.auth.oauth2.GoogleCredentials;
2222
import com.google.cloud.Clock;
23-
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
23+
import com.google.cloud.pubsub.MessageReceiver.AckReply;
2424
import com.google.common.base.Optional;
2525
import com.google.common.base.Preconditions;
26-
import com.google.common.util.concurrent.ListenableFuture;
2726
import com.google.common.util.concurrent.Service;
28-
import com.google.pubsub.v1.PubsubMessage;
2927
import io.grpc.ManagedChannelBuilder;
3028
import java.io.IOException;
3129
import java.util.concurrent.ScheduledExecutorService;
@@ -94,27 +92,6 @@ public interface Subscriber extends Service {
9492
/** Retrieves a snapshot of the current subscriber statistics. */
9593
SubscriberStats getStats();
9694

97-
/** Users of the {@link Subscriber} must implement this interface to receive messages. */
98-
interface MessageReceiver {
99-
public static enum AckReply {
100-
/**
101-
* To be used for acking a message.
102-
*/
103-
ACK,
104-
/**
105-
* To be used for nacking a message.
106-
*/
107-
NACK
108-
}
109-
110-
/**
111-
* Called when a message is received by the subscriber.
112-
*
113-
* @return A future that signals when a message has been processed.
114-
*/
115-
ListenableFuture<AckReply> receiveMessage(PubsubMessage message);
116-
}
117-
11895
/** Subscription for which the subscriber is streaming messages. */
11996
String getSubscription();
12097

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import static org.junit.Assert.assertEquals;
2121

2222
import com.google.cloud.pubsub.FakeSubscriberServiceImpl.ModifyAckDeadline;
23+
import com.google.cloud.pubsub.MessageReceiver.AckReply;
2324
import com.google.cloud.pubsub.Subscriber.Builder;
24-
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
25-
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2625
import com.google.common.base.Function;
2726
import com.google.common.base.Optional;
2827
import com.google.common.base.Preconditions;

0 commit comments

Comments
 (0)