Skip to content

Commit 7693456

Browse files
committed
Add MessageConsumerImpl class, implement pullAsync, add tests
1 parent d487307 commit 7693456

12 files changed

Lines changed: 1029 additions & 29 deletions

File tree

gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.io.ObjectInputStream;
3030
import java.util.Objects;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.ScheduledThreadPoolExecutor;
3334
import java.util.concurrent.TimeUnit;
@@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions<ServiceT extends Service<OptionsT>, Ser
5051
private final double timeoutMultiplier;
5152
private final int maxTimeout;
5253

53-
private transient ExecutorFactory executorFactory;
54+
private transient ExecutorFactory<ScheduledExecutorService> executorFactory;
5455

5556
/**
5657
* Shared thread pool executor.
@@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) {
7374
};
7475

7576
/**
76-
* An interface for {@link ScheduledExecutorService} factories. Implementations of this interface
77-
* can be used to provide an user-defined scheduled executor to execute requests. Any
78-
* implementation of this interface must override the {@code get()} method to return the desired
79-
* executor. The {@code release(executor)} method should be overriden to free resources used by
80-
* the executor (if needed) according to application's logic.
77+
* An interface for {@link ExecutorService} factories. Implementations of this interface can be
78+
* used to provide an user-defined executor to execute requests. Any implementation of this
79+
* interface must override the {@code get()} method to return the desired executor. The
80+
* {@code release(executor)} method should be overriden to free resources used by the executor (if
81+
* needed) according to application's logic.
8182
*
8283
* <p>Implementation must provide a public no-arg constructor. Loading of a factory implementation
8384
* is done via {@link java.util.ServiceLoader}.
85+
*
86+
* @param <T> the {@link ExecutorService} subclass created by this factory
8487
*/
85-
public interface ExecutorFactory {
88+
public interface ExecutorFactory<T extends ExecutorService> {
8689

8790
/**
88-
* Gets a scheduled executor service instance.
91+
* Gets an executor service instance.
8992
*/
90-
ScheduledExecutorService get();
93+
T get();
9194

9295
/**
9396
* Releases resources used by the executor and possibly shuts it down.
9497
*/
95-
void release(ScheduledExecutorService executor);
98+
void release(T executor);
9699
}
97100

98101
@VisibleForTesting
99-
static class DefaultExecutorFactory implements ExecutorFactory {
102+
static class DefaultExecutorFactory implements ExecutorFactory<ScheduledExecutorService> {
100103

101104
private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory();
102105

@@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
148151
*
149152
* @return the builder
150153
*/
151-
public B executorFactory(ExecutorFactory executorFactory) {
154+
public B executorFactory(ExecutorFactory<ScheduledExecutorService> executorFactory) {
152155
this.executorFactory = executorFactory;
153156
return self();
154157
}
@@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) {
192195
}
193196
}
194197

198+
@SuppressWarnings("unchecked")
195199
protected GrpcServiceOptions(
196200
Class<? extends ServiceFactory<ServiceT, OptionsT>> serviceFactoryClass,
197201
Class<? extends ServiceRpcFactory<ServiceRpcT, OptionsT>> rpcFactoryClass, Builder<ServiceT,
@@ -208,7 +212,7 @@ protected GrpcServiceOptions(
208212
/**
209213
* Returns a scheduled executor service provider.
210214
*/
211-
protected ExecutorFactory executorFactory() {
215+
protected ExecutorFactory<ScheduledExecutorService> executorFactory() {
212216
return executorFactory;
213217
}
214218

gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void testBaseHashCode() {
211211

212212
@Test
213213
public void testDefaultExecutorFactory() {
214-
ExecutorFactory executorFactory = new DefaultExecutorFactory();
214+
ExecutorFactory<ScheduledExecutorService> executorFactory = new DefaultExecutorFactory();
215215
ScheduledExecutorService executorService = executorFactory.get();
216216
assertSame(executorService, executorFactory.get());
217217
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable {
4848

4949
private final PubSub pubsub;
5050
private final ScheduledExecutorService executor;
51-
private final ExecutorFactory executorFactory;
51+
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
5252
private final Clock clock;
5353
private final Queue<Message> messageQueue;
5454
private final Map<MessageId, Long> messageDeadlines;
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName;
20+
import static com.google.common.base.MoreObjects.firstNonNull;
21+
22+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
23+
import com.google.cloud.pubsub.PubSub.MessageConsumer;
24+
import com.google.cloud.pubsub.PubSub.MessageProcessor;
25+
import com.google.cloud.pubsub.spi.PubSubRpc;
26+
import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
27+
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
28+
import com.google.pubsub.v1.PullRequest;
29+
import com.google.pubsub.v1.PullResponse;
30+
31+
import io.grpc.internal.SharedResourceHolder;
32+
33+
import java.util.List;
34+
import java.util.concurrent.CancellationException;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.Future;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.ScheduledThreadPoolExecutor;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
43+
/**
44+
* Default implementation for a message consumer.
45+
*/
46+
final class MessageConsumerImpl implements MessageConsumer {
47+
48+
private static final int MAX_QUEUED_CALLBACKS = 100;
49+
// shared scheduled executor, used to schedule pulls
50+
private static final SharedResourceHolder.Resource<ScheduledExecutorService> TIMER =
51+
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
52+
@Override
53+
public ScheduledExecutorService create() {
54+
ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1);
55+
timer.setRemoveOnCancelPolicy(true);
56+
return timer;
57+
}
58+
59+
@Override
60+
public void close(ScheduledExecutorService instance) {
61+
instance.shutdown();
62+
}
63+
};
64+
65+
private final PubSubOptions pubsubOptions;
66+
private final PubSubRpc pubsubRpc;
67+
private final PubSub pubsub;
68+
private final AckDeadlineRenewer deadlineRenewer;
69+
private final String subscription;
70+
private final MessageProcessor messageProcessor;
71+
private final ScheduledExecutorService timer;
72+
private final ExecutorFactory<ExecutorService> executorFactory;
73+
private final ExecutorService executor;
74+
private final AtomicInteger queuedCallbacks;
75+
private final int maxQueuedCallbacks;
76+
private final Object futureLock = new Object();
77+
private final Runnable scheduleRunnable;
78+
private boolean closed;
79+
private Future<?> scheduledFuture;
80+
private PullFuture pullerFuture;
81+
private boolean stopped = true;
82+
83+
/**
84+
* Default executor factory for the message processor executor. By default a single-threaded
85+
* executor is used.
86+
*/
87+
static class DefaultExecutorFactory implements ExecutorFactory<ExecutorService> {
88+
89+
private final ExecutorService executor = Executors.newSingleThreadExecutor();
90+
91+
@Override
92+
public ExecutorService get() {
93+
return executor;
94+
}
95+
96+
@Override
97+
public void release(ExecutorService executor) {
98+
executor.shutdownNow();
99+
}
100+
}
101+
102+
private MessageConsumerImpl(Builder builder) {
103+
this.pubsubOptions = builder.pubsubOptions;
104+
this.subscription = builder.subscription;
105+
this.messageProcessor = builder.messageProcessor;
106+
this.pubsubRpc = pubsubOptions.rpc();
107+
this.pubsub = pubsubOptions.service();
108+
this.deadlineRenewer = builder.deadlineRenewer;
109+
this.queuedCallbacks = new AtomicInteger();
110+
this.timer = SharedResourceHolder.get(TIMER);
111+
this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory());
112+
this.executor = executorFactory.get();
113+
this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS);
114+
this.scheduleRunnable = new Runnable() {
115+
@Override
116+
public void run() {
117+
synchronized (futureLock) {
118+
if (closed) {
119+
return;
120+
}
121+
pull();
122+
}
123+
}
124+
};
125+
nextPull();
126+
}
127+
128+
private Runnable ackingRunnable(final ReceivedMessage receivedMessage) {
129+
return new Runnable() {
130+
@Override
131+
public void run() {
132+
try {
133+
messageProcessor.process(receivedMessage);
134+
pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId());
135+
} catch (Exception ex) {
136+
pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId());
137+
} finally {
138+
deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId());
139+
queuedCallbacks.decrementAndGet();
140+
// We can now pull more messages. We do not pull immediately to possibly wait for other
141+
// callbacks to end
142+
scheduleNextPull(500, TimeUnit.MILLISECONDS);
143+
}
144+
}
145+
};
146+
}
147+
148+
private PullRequest createPullRequest() {
149+
return PullRequest.newBuilder()
150+
.setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription))
151+
.setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get())
152+
.setReturnImmediately(false)
153+
.build();
154+
}
155+
156+
private void scheduleNextPull(long delay, TimeUnit timeUnit) {
157+
synchronized (futureLock) {
158+
if (!closed && stopped) {
159+
scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit);
160+
}
161+
}
162+
}
163+
164+
private void nextPull() {
165+
synchronized (futureLock) {
166+
if (closed) {
167+
return;
168+
}
169+
if (queuedCallbacks.get() == maxQueuedCallbacks) {
170+
stopped = true;
171+
} else {
172+
stopped = false;
173+
scheduledFuture = timer.submit(scheduleRunnable);
174+
}
175+
}
176+
}
177+
178+
private void pull() {
179+
pullerFuture = pubsubRpc.pull(createPullRequest());
180+
pullerFuture.addCallback(new PullCallback() {
181+
@Override
182+
public void success(PullResponse response) {
183+
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
184+
queuedCallbacks.addAndGet(messages.size());
185+
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
186+
deadlineRenewer.add(subscription, message.getAckId());
187+
final ReceivedMessage receivedMessage =
188+
ReceivedMessage.fromPb(pubsub, subscription, message);
189+
executor.execute(ackingRunnable(receivedMessage));
190+
}
191+
nextPull();
192+
}
193+
194+
@Override
195+
public void failure(Throwable error) {
196+
if (!(error instanceof CancellationException)) {
197+
nextPull();
198+
}
199+
}
200+
});
201+
}
202+
203+
@Override
204+
public void close() {
205+
synchronized (futureLock) {
206+
if (closed) {
207+
return;
208+
}
209+
closed = true;
210+
if (scheduledFuture != null) {
211+
scheduledFuture.cancel(true);
212+
}
213+
if (pullerFuture != null) {
214+
pullerFuture.cancel(true);
215+
}
216+
}
217+
SharedResourceHolder.release(TIMER, timer);
218+
executorFactory.release(executor);
219+
}
220+
221+
static final class Builder {
222+
private final PubSubOptions pubsubOptions;
223+
private final String subscription;
224+
private final AckDeadlineRenewer deadlineRenewer;
225+
private final MessageProcessor messageProcessor;
226+
private Integer maxQueuedCallbacks;
227+
private ExecutorFactory<ExecutorService> executorFactory;
228+
229+
Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer,
230+
MessageProcessor messageProcessor) {
231+
this.pubsubOptions = pubsubOptions;
232+
this.subscription = subscription;
233+
this.deadlineRenewer = deadlineRenewer;
234+
this.messageProcessor = messageProcessor;
235+
}
236+
237+
/**
238+
* Sets the maximum number of callbacks either being executed or waiting for execution.
239+
*/
240+
Builder maxQueuedCallbacks(Integer maxQueuedCallbacks) {
241+
this.maxQueuedCallbacks = maxQueuedCallbacks;
242+
return this;
243+
}
244+
245+
/**
246+
* Sets the executor factory, used to manage the executor that will run message processor
247+
* callbacks message consumer.
248+
*/
249+
Builder executorFactory(ExecutorFactory<ExecutorService> executorFactory) {
250+
this.executorFactory = executorFactory;
251+
return this;
252+
}
253+
254+
/**
255+
* Creates a {@code MessageConsumerImpl} object.
256+
*/
257+
MessageConsumerImpl build() {
258+
return new MessageConsumerImpl(this);
259+
}
260+
}
261+
262+
/**
263+
* Returns a builder for {@code MessageConsumerImpl} objects given the service options, the
264+
* subscription from which messages must be pulled, the acknowledge deadline renewer and a message
265+
* processor used to process messages.
266+
*/
267+
static Builder builder(PubSubOptions pubsubOptions, String subscription,
268+
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
269+
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
270+
}
271+
272+
/**
273+
* Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from
274+
* which messages must be pulled, the acknowledge deadline renewer and a message processor used to
275+
* process messages.
276+
*/
277+
static Builder of(PubSubOptions pubsubOptions, String subscription,
278+
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
279+
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
280+
}
281+
}

0 commit comments

Comments
 (0)