@@ -6,7 +6,7 @@ use crate::rt::{Read, Write};
66use bytes:: Bytes ;
77use futures_channel:: mpsc:: { Receiver , Sender } ;
88use futures_channel:: { mpsc, oneshot} ;
9- use futures_util:: future:: { self , Either , FutureExt as _, Select } ;
9+ use futures_util:: future:: { Either , FusedFuture , FutureExt as _} ;
1010use futures_util:: stream:: { StreamExt as _, StreamFuture } ;
1111use h2:: client:: { Builder , Connection , SendRequest } ;
1212use h2:: SendStream ;
@@ -143,7 +143,10 @@ where
143143 } else {
144144 ( Either :: Right ( conn) , ping:: disabled ( ) )
145145 } ;
146- let conn: ConnMapErr < T , B > = ConnMapErr { conn } ;
146+ let conn: ConnMapErr < T , B > = ConnMapErr {
147+ conn,
148+ is_terminated : false ,
149+ } ;
147150
148151 exec. execute_h2_future ( H2ClientFuture :: Task {
149152 task : ConnTask :: new ( conn, conn_drop_rx, cancel_tx) ,
@@ -218,6 +221,8 @@ pin_project! {
218221 {
219222 #[ pin]
220223 conn: Either <Conn <T , B >, Connection <Compat <T >, SendBuf <<B as Body >:: Data >>>,
224+ #[ pin]
225+ is_terminated: bool ,
221226 }
222227}
223228
@@ -229,10 +234,26 @@ where
229234 type Output = Result < ( ) , ( ) > ;
230235
231236 fn poll ( self : Pin < & mut Self > , cx : & mut std:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
232- self . project ( )
233- . conn
234- . poll ( cx)
235- . map_err ( |e| debug ! ( "connection error: {}" , e) )
237+ let mut this = self . project ( ) ;
238+
239+ if * this. is_terminated {
240+ return Poll :: Pending ;
241+ }
242+ let polled = this. conn . poll ( cx) ;
243+ if polled. is_ready ( ) {
244+ * this. is_terminated = true ;
245+ }
246+ polled. map_err ( |e| debug ! ( "connection error: {}" , e) )
247+ }
248+ }
249+
250+ impl < T , B > FusedFuture for ConnMapErr < T , B >
251+ where
252+ B : Body ,
253+ T : Read + Write + Unpin ,
254+ {
255+ fn is_terminated ( & self ) -> bool {
256+ self . is_terminated
236257 }
237258}
238259
@@ -245,10 +266,11 @@ pin_project! {
245266 T : Unpin ,
246267 {
247268 #[ pin]
248- select : Select < ConnMapErr < T , B > , StreamFuture <Receiver <Never > >>,
269+ drop_rx : StreamFuture <Receiver <Never >>,
249270 #[ pin]
250271 cancel_tx: Option <oneshot:: Sender <Never >>,
251- conn: Option <ConnMapErr <T , B >>,
272+ #[ pin]
273+ conn: ConnMapErr <T , B >,
252274 }
253275}
254276
@@ -263,9 +285,9 @@ where
263285 cancel_tx : oneshot:: Sender < Never > ,
264286 ) -> Self {
265287 Self {
266- select : future :: select ( conn , drop_rx) ,
288+ drop_rx,
267289 cancel_tx : Some ( cancel_tx) ,
268- conn : None ,
290+ conn,
269291 }
270292 }
271293}
@@ -280,25 +302,24 @@ where
280302 fn poll ( self : Pin < & mut Self > , cx : & mut std:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
281303 let mut this = self . project ( ) ;
282304
283- if let Some ( conn) = this. conn {
284- conn. poll_unpin ( cx) . map ( |_| ( ) )
285- } else {
286- match ready ! ( this. select. poll_unpin( cx) ) {
287- Either :: Left ( ( _, _) ) => {
288- // ok or err, the `conn` has finished
289- return Poll :: Ready ( ( ) ) ;
290- }
291- Either :: Right ( ( _, b) ) => {
292- // mpsc has been dropped, hopefully polling
293- // the connection some more should start shutdown
294- // and then close
295- trace ! ( "send_request dropped, starting conn shutdown" ) ;
296- drop ( this. cancel_tx . take ( ) . expect ( "Future polled twice" ) ) ;
297- this. conn = & mut Some ( b) ;
298- return Poll :: Pending ;
299- }
300- }
305+ if !this. conn . is_terminated ( ) {
306+ if let Poll :: Ready ( _) = this. conn . poll_unpin ( cx) {
307+ // ok or err, the `conn` has finished.
308+ return Poll :: Ready ( ( ) ) ;
309+ } ;
301310 }
311+
312+ if !this. drop_rx . is_terminated ( ) {
313+ if let Poll :: Ready ( _) = this. drop_rx . poll_unpin ( cx) {
314+ // mpsc has been dropped, hopefully polling
315+ // the connection some more should start shutdown
316+ // and then close.
317+ trace ! ( "send_request dropped, starting conn shutdown" ) ;
318+ drop ( this. cancel_tx . take ( ) . expect ( "ConnTask Future polled twice" ) ) ;
319+ }
320+ } ;
321+
322+ Poll :: Pending
302323 }
303324}
304325
0 commit comments