|
22 | 22 | import io.r2dbc.mssql.message.token.SqlBatch; |
23 | 23 | import io.r2dbc.mssql.util.IntegrationTestSupport; |
24 | 24 | import io.r2dbc.spi.R2dbcNonTransientResourceException; |
| 25 | +import org.junit.jupiter.api.AfterEach; |
25 | 26 | import org.junit.jupiter.api.BeforeEach; |
26 | 27 | import org.junit.jupiter.api.Test; |
27 | 28 | import org.springframework.util.ReflectionUtils; |
@@ -54,18 +55,25 @@ class ReactorNettyClientIntegrationTests extends IntegrationTestSupport { |
54 | 55 | ReflectionUtils.makeAccessible(CLIENT); |
55 | 56 | } |
56 | 57 |
|
| 58 | + private io.r2dbc.spi.Connection r2dbcConnection; |
57 | 59 | private ReactorNettyClient client; |
58 | 60 |
|
59 | 61 | private Connection connection; |
60 | 62 |
|
61 | 63 | @BeforeEach |
62 | 64 | void setUp() { |
63 | | - this.client = (ReactorNettyClient) ReflectionUtils.getField(CLIENT, IntegrationTestSupport.connection); |
| 65 | + this.r2dbcConnection = connectionFactory.create().block(); |
| 66 | + this.client = (ReactorNettyClient) ReflectionUtils.getField(CLIENT, this.r2dbcConnection); |
64 | 67 | this.connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client); |
65 | 68 | } |
66 | 69 |
|
| 70 | + @AfterEach |
| 71 | + void tearDown() { |
| 72 | + Mono.from(r2dbcConnection.close()).subscribe(); |
| 73 | + } |
| 74 | + |
67 | 75 | @Test |
68 | | - void disconnectedShouldRejectExchange() throws InterruptedException { |
| 76 | + void disconnectedShouldRejectExchange() { |
69 | 77 |
|
70 | 78 | Connection connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client); |
71 | 79 | connection.channel().close().awaitUninterruptibly(); |
@@ -99,29 +107,4 @@ void shouldCancelExchangeOnCloseFirstMessage() throws Exception { |
99 | 107 | } |
100 | 108 | } |
101 | 109 |
|
102 | | - @Test |
103 | | - void shouldCancelExchangeOnCloseInFlight() throws Exception { |
104 | | - |
105 | | - Connection connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client); |
106 | | - |
107 | | - SqlBatch batch = SqlBatch.create(0, this.client.getTransactionDescriptor(), "SELECT value FROM test"); |
108 | | - |
109 | | - Sinks.Many<ClientMessage> messages = Sinks.many().unicast().onBackpressureBuffer(); |
110 | | - Flux<Message> query = this.client.exchange(messages.asFlux(), message -> true); |
111 | | - CompletableFuture<List<Message>> future = query.doOnNext(ignore -> { |
112 | | - connection.channel().close(); |
113 | | - messages.tryEmitNext(batch); |
114 | | - |
115 | | - }).collectList().toFuture(); |
116 | | - |
117 | | - messages.tryEmitNext(batch); |
118 | | - |
119 | | - try { |
120 | | - future.get(5, TimeUnit.SECONDS); |
121 | | - fail("Expected MssqlConnectionClosedException"); |
122 | | - } catch (ExecutionException e) { |
123 | | - assertThat(e).hasCauseInstanceOf(ReactorNettyClient.MssqlConnectionClosedException.class).hasMessageContaining("closed"); |
124 | | - } |
125 | | - } |
126 | | - |
127 | 110 | } |
0 commit comments