-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Pubsub io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED #2131
Copy link
Copy link
Closed
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.type: questionRequest for information or clarification. Not an issue.Request for information or clarification. Not an issue.
Description
We are using Pubsub in a way that we need to wait to nack until close to the end of the ackDeadline. We've introduced a Thread.sleep that is called within the MessageReceiver before nacking the message.
In doing this, we have seen a large number of messages, similar to:
Exception in thread "main" java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:330)
at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
at com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
at App.main(App.java:48)
Caused by: java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:330)
at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
at com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
at com.google.cloud.pubsub.spi.v1.Subscriber.startConnections(Subscriber.java:406)
at com.google.cloud.pubsub.spi.v1.Subscriber.startPollingConnections(Subscriber.java:375)
at com.google.cloud.pubsub.spi.v1.Subscriber.access$200(Subscriber.java:77)
at com.google.cloud.pubsub.spi.v1.Subscriber$4.run(Subscriber.java:256)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED
at io.grpc.Status.asRuntimeException(Status.java:540)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:439)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
Jun 08, 2017 6:48:36 PM com.google.cloud.pubsub.spi.v1.PollingSubscriberConnection$2 onFailure
WARNING: Failed to pull messages (recoverable):
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED
at io.grpc.Status.asRuntimeException(Status.java:540)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:439)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I scaled back our code to the attached code (a simple, reproducible example). It's actually just the PubSub Snippet code with a Thread.sleep(50000) added (note: for this example I call .ack while we are calling .nack.
Our incoming subscription has about 100 messages on it.
Source:
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
public class App {
public static void main(String[] args) {
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
try {
Thread.sleep(50000);
}
catch(InterruptedException ie)
{
ie.printStackTrace();
}
consumer.ack();
}
};
Subscriber subscriber = null;
try {
TopicName topic = TopicName.create("my-project", "pubsub-exc");
SubscriptionName subscription = SubscriptionName.create("my-project",
"pubsub.issue.sub");
subscriber = Subscriber.defaultBuilder(subscription, receiver)
.build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.type: questionRequest for information or clarification. Not an issue.Request for information or clarification. Not an issue.