Steps to reproduce:
List the minimal actions needed to reproduce the behavior.
- Create a long running request with
QueryReactiveApi and schedule it as a Future
- Cancel the future forcibly (
future.cancel(true);)
QueryReactiveApi queryApi = client.getQueryReactiveApi();
Disposable disposable;
Future<?> future = executor.scheduleWithFixedDelay(() -> {
disposable = queryApi.queryRaw(query)
.doOnNext(onNext -> {
System.out.println("onNext");
}).subscribe();
}, 0, 60, TimeUnit.SECONDS);
future.cancel(true);
Expected behavior:
The request would be canceled silently
Actual behavior:
Huge pile of stack traces:
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.io.InterruptedIOException: interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
at com.influxdb.internal.AbstractRestClient.catchOrPropagateException(AbstractRestClient.java:158)
at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:140)
at com.influxdb.internal.AbstractQueryApi.lambda$query$5(AbstractQueryApi.java:174)
at com.influxdb.internal.AbstractQueryApi$1.onResponse(AbstractQueryApi.java:220)
at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:238)
at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:190)
at com.influxdb.internal.AbstractQueryApi.queryRaw(AbstractQueryApi.java:144)
at com.influxdb.client.reactive.internal.QueryReactiveApiImpl.lambda$null$8(QueryReactiveApiImpl.java:334)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.Flowable.subscribe(Flowable.java:14882)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.Flowable.subscribe(Flowable.java:14872)
at io.reactivex.Flowable.subscribe(Flowable.java:14730)
at main.Main.main(Main.java:33)
Caused by: java.io.InterruptedIOException: interrupted
at okio.Timeout.throwIfReached(Timeout.kt:98)
at okio.InputStreamSource.read(JvmOkio.kt:87)
at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:129)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okhttp3.internal.http1.Http1ExchangeCodec$AbstractSource.read(Http1ExchangeCodec.java:389)
at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:475)
at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.java:286)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okio.RealBufferedSource.exhausted(RealBufferedSource.kt:198)
at okio.InflaterSource.refill(InflaterSource.kt:112)
at okio.InflaterSource.readOrInflate(InflaterSource.kt:76)
at okio.InflaterSource.read(InflaterSource.kt:49)
at okio.GzipSource.read(GzipSource.kt:69)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okio.ForwardingSource.read(ForwardingSource.kt:29)
at retrofit2.OkHttpCall$ExceptionCatchingResponseBody$1.read(OkHttpCall.java:314)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:449)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:117)
at okio.RealBufferedSource.readUtf8Line(RealBufferedSource.kt:319)
at com.influxdb.internal.AbstractQueryApi.parseFluxResponseToLines(AbstractQueryApi.java:249)
at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:138)
... 47 more
Shouldn't AbstractRestClient check if the onError-handler is disposed before calling accept(exception):
void catchOrPropagateException(@Nonnull final Exception exception,
@Nonnull final Consumer<? super Throwable> onError) {
Arguments.checkNotNull(exception, "exception");
Arguments.checkNotNull(onError, "onError");
//
// Socket closed by remote server or end of data
//
if (isCloseException(exception)) {
LOG.log(Level.FINEST, "Socket closed by remote server or end of data", exception);
} else {
// here check if onError is disposed already ?
onError.accept(exception);
}
}
Or alternatively add a case to the if-clause and check for interruptions?
Specifications:
- Client Version: 4.3.0
- InfluxDB Version: 2.1.1
- JDK Version: 11
- Platform: Linux
Steps to reproduce:
List the minimal actions needed to reproduce the behavior.
QueryReactiveApiand schedule it as aFuturefuture.cancel(true);)Expected behavior:
The request would be canceled silently
Actual behavior:
Huge pile of stack traces:
Shouldn't
AbstractRestClientcheck if the onError-handler is disposed before callingaccept(exception):Or alternatively add a case to the if-clause and check for interruptions?
Specifications: