Skip to content

QueryReactiveApi The exception could not be delivered to the consumer because it has already canceled/disposed #311

@jsimomaa

Description

@jsimomaa

Steps to reproduce:
List the minimal actions needed to reproduce the behavior.

  1. Create a long running request with QueryReactiveApi and schedule it as a Future
  2. Cancel the future forcibly (future.cancel(true);)
QueryReactiveApi queryApi = client.getQueryReactiveApi();

Disposable disposable;
Future<?> future = executor.scheduleWithFixedDelay(() -> {
    disposable = queryApi.queryRaw(query)
        .doOnNext(onNext -> {
            System.out.println("onNext");
        }).subscribe();
}, 0, 60, TimeUnit.SECONDS);

future.cancel(true);

Expected behavior:
The request would be canceled silently

Actual behavior:
Huge pile of stack traces:

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.io.InterruptedIOException: interrupted
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at com.influxdb.internal.AbstractRestClient.catchOrPropagateException(AbstractRestClient.java:158)
        at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:140)
        at com.influxdb.internal.AbstractQueryApi.lambda$query$5(AbstractQueryApi.java:174)
        at com.influxdb.internal.AbstractQueryApi$1.onResponse(AbstractQueryApi.java:220)
        at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:238)
        at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:190)
        at com.influxdb.internal.AbstractQueryApi.queryRaw(AbstractQueryApi.java:144)
        at com.influxdb.client.reactive.internal.QueryReactiveApiImpl.lambda$null$8(QueryReactiveApiImpl.java:334)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.Flowable.subscribe(Flowable.java:14882)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.Flowable.subscribe(Flowable.java:14872)
        at io.reactivex.Flowable.subscribe(Flowable.java:14730)
        at main.Main.main(Main.java:33)
Caused by: java.io.InterruptedIOException: interrupted
        at okio.Timeout.throwIfReached(Timeout.kt:98)
        at okio.InputStreamSource.read(JvmOkio.kt:87)
        at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:129)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okhttp3.internal.http1.Http1ExchangeCodec$AbstractSource.read(Http1ExchangeCodec.java:389)
        at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:475)
        at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.java:286)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okio.RealBufferedSource.exhausted(RealBufferedSource.kt:198)
        at okio.InflaterSource.refill(InflaterSource.kt:112)
        at okio.InflaterSource.readOrInflate(InflaterSource.kt:76)
        at okio.InflaterSource.read(InflaterSource.kt:49)
        at okio.GzipSource.read(GzipSource.kt:69)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okio.ForwardingSource.read(ForwardingSource.kt:29)
        at retrofit2.OkHttpCall$ExceptionCatchingResponseBody$1.read(OkHttpCall.java:314)
        at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:449)
        at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:117)
        at okio.RealBufferedSource.readUtf8Line(RealBufferedSource.kt:319)
        at com.influxdb.internal.AbstractQueryApi.parseFluxResponseToLines(AbstractQueryApi.java:249)
        at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:138)
        ... 47 more

Shouldn't AbstractRestClient check if the onError-handler is disposed before calling accept(exception):

    void catchOrPropagateException(@Nonnull final Exception exception,
                                   @Nonnull final Consumer<? super Throwable> onError) {

        Arguments.checkNotNull(exception, "exception");
        Arguments.checkNotNull(onError, "onError");

        //
        // Socket closed by remote server or end of data
        //
        if (isCloseException(exception)) {
            LOG.log(Level.FINEST, "Socket closed by remote server or end of data", exception);
        } else {
            // here check if onError is disposed already ?
            onError.accept(exception);
        }
    }

Or alternatively add a case to the if-clause and check for interruptions?

Specifications:

  • Client Version: 4.3.0
  • InfluxDB Version: 2.1.1
  • JDK Version: 11
  • Platform: Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions