Skip to content

Pubsub io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED #2131

@frankwmoyer

Description

@frankwmoyer

We are using Pubsub in a way that we need to wait to nack until close to the end of the ackDeadline. We've introduced a Thread.sleep that is called within the MessageReceiver before nacking the message.

In doing this, we have seen a large number of messages, similar to:

Exception in thread "main" java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
        at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:330)
        at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
        at com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
        at App.main(App.java:48)
Caused by: java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
        at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:330)
        at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
        at com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
        at com.google.cloud.pubsub.spi.v1.Subscriber.startConnections(Subscriber.java:406)
        at com.google.cloud.pubsub.spi.v1.Subscriber.startPollingConnections(Subscriber.java:375)
        at com.google.cloud.pubsub.spi.v1.Subscriber.access$200(Subscriber.java:77)
        at com.google.cloud.pubsub.spi.v1.Subscriber$4.run(Subscriber.java:256)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED
        at io.grpc.Status.asRuntimeException(Status.java:540)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:439)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
        at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
        at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more
java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED
Jun 08, 2017 6:48:36 PM com.google.cloud.pubsub.spi.v1.PollingSubscriberConnection$2 onFailure
WARNING: Failed to pull messages (recoverable): 
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED
        at io.grpc.Status.asRuntimeException(Status.java:540)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:439)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
        at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
        at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I scaled back our code to the attached code (a simple, reproducible example). It's actually just the PubSub Snippet code with a Thread.sleep(50000) added (note: for this example I call .ack while we are calling .nack.

Our incoming subscription has about 100 messages on it.

Source:

import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;

public class App {

    public static void main(String[] args) {

        MessageReceiver receiver =
            new MessageReceiver() {
                @Override
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                    System.out.println("got message: " + message.getData().toStringUtf8());
                    try {
                        Thread.sleep(50000);
                    }
                    catch(InterruptedException ie)
                        {
                            ie.printStackTrace();
                        }

                    consumer.ack();
                }
            };

        Subscriber subscriber = null;
        try {
            TopicName topic = TopicName.create("my-project", "pubsub-exc");
            SubscriptionName subscription = SubscriptionName.create("my-project",
                                                                    "pubsub.issue.sub");
            subscriber = Subscriber.defaultBuilder(subscription, receiver)
                .build();

            subscriber.addListener(new Subscriber.Listener() {
                    @Override
                    public void failed(Subscriber.State from, Throwable failure) {
                        System.err.println(failure);
                    }
                },
                MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }
}

Metadata

Metadata

Labels

api: pubsubIssues related to the Pub/Sub API.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions