Skip to content

Commit 28e24aa

Browse files
committed
Terminate request queue on connection error first before terminating response handlers.
Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated. [#245] Signed-off-by: Mark Paluch <[email protected]>
1 parent dae72ea commit 28e24aa

1 file changed

Lines changed: 5 additions & 14 deletions

File tree

src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,7 @@
3636
import io.r2dbc.mssql.message.header.PacketIdProvider;
3737
import io.r2dbc.mssql.message.tds.ProtocolException;
3838
import io.r2dbc.mssql.message.tds.Redirect;
39-
import io.r2dbc.mssql.message.token.AbstractDoneToken;
40-
import io.r2dbc.mssql.message.token.AbstractInfoToken;
41-
import io.r2dbc.mssql.message.token.Attention;
42-
import io.r2dbc.mssql.message.token.EnvChangeToken;
43-
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
44-
import io.r2dbc.mssql.message.token.LoginAckToken;
39+
import io.r2dbc.mssql.message.token.*;
4540
import io.r2dbc.mssql.message.type.Collation;
4641
import io.r2dbc.mssql.util.Assert;
4742
import io.r2dbc.spi.R2dbcException;
@@ -50,11 +45,7 @@
5045
import org.reactivestreams.Subscriber;
5146
import org.reactivestreams.Subscription;
5247
import reactor.core.CoreSubscriber;
53-
import reactor.core.publisher.Flux;
54-
import reactor.core.publisher.Mono;
55-
import reactor.core.publisher.MonoSink;
56-
import reactor.core.publisher.Sinks;
57-
import reactor.core.publisher.SynchronousSink;
48+
import reactor.core.publisher.*;
5849
import reactor.netty.Connection;
5950
import reactor.netty.NettyOutbound;
6051
import reactor.netty.resources.ConnectionProvider;
@@ -367,7 +358,8 @@ private Object encodeForSend(ClientMessage message) {
367358
@SuppressWarnings("unchecked")
368359
private <T> Mono<T> resumeError(Throwable throwable) {
369360

370-
handleConnectionError(throwable);
361+
logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);
362+
371363
this.requestSink.emitComplete((signalType, emitResult) -> {
372364

373365
if (emitResult.isFailure()) {
@@ -377,8 +369,7 @@ private <T> Mono<T> resumeError(Throwable throwable) {
377369
return false;
378370
});
379371

380-
logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable);
381-
372+
handleConnectionError(throwable);
382373
return (Mono<T>) close();
383374
}
384375

0 commit comments

Comments
 (0)