@@ -14,7 +14,6 @@ use httparse::ParserConfig;
1414use super :: super :: dispatch;
1515use crate :: body:: { Body , Incoming as IncomingBody } ;
1616use crate :: proto;
17- use crate :: upgrade:: Upgraded ;
1817
1918type Dispatcher < T , B > =
2019 proto:: dispatch:: Dispatcher < proto:: dispatch:: Client < B > , B , T , proto:: h1:: ClientTransaction > ;
@@ -51,23 +50,23 @@ pub struct Parts<T> {
5150#[ must_use = "futures do nothing unless polled" ]
5251pub struct Connection < T , B >
5352where
54- T : Read + Write + Send + ' static ,
53+ T : Read + Write + ' static ,
5554 B : Body + ' static ,
5655{
57- inner : Option < Dispatcher < T , B > > ,
56+ inner : Dispatcher < T , B > ,
5857}
5958
6059impl < T , B > Connection < T , B >
6160where
62- T : Read + Write + Send + Unpin + ' static ,
61+ T : Read + Write + Unpin + ' static ,
6362 B : Body + ' static ,
6463 B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
6564{
6665 /// Return the inner IO object, and additional information.
6766 ///
6867 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
6968 pub fn into_parts ( self ) -> Parts < T > {
70- let ( io, read_buf, _) = self . inner . expect ( "already upgraded" ) . into_inner ( ) ;
69+ let ( io, read_buf, _) = self . inner . into_inner ( ) ;
7170 Parts {
7271 io,
7372 read_buf,
8786 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
8887 /// to work with this function; or use the `without_shutdown` wrapper.
8988 pub fn poll_without_shutdown ( & mut self , cx : & mut Context < ' _ > ) -> Poll < crate :: Result < ( ) > > {
90- self . inner
91- . as_mut ( )
92- . expect ( "already upgraded" )
93- . poll_without_shutdown ( cx)
89+ self . inner . poll_without_shutdown ( cx)
9490 }
9591}
9692
@@ -119,7 +115,7 @@ pub struct Builder {
119115/// See [`client::conn`](crate::client::conn) for more.
120116pub async fn handshake < T , B > ( io : T ) -> crate :: Result < ( SendRequest < B > , Connection < T , B > ) >
121117where
122- T : Read + Write + Unpin + Send + ' static ,
118+ T : Read + Write + Unpin + ' static ,
123119 B : Body + ' static ,
124120 B :: Data : Send ,
125121 B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
@@ -240,9 +236,23 @@ impl<B> fmt::Debug for SendRequest<B> {
240236
241237// ===== impl Connection
242238
239+ impl < T , B > Connection < T , B >
240+ where
241+ T : Read + Write + Unpin + Send + ' static ,
242+ B : Body + ' static ,
243+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
244+ {
245+ /// Enable this connection to support higher-level HTTP upgrades.
246+ ///
247+ /// See [the `upgrade` module](crate::upgrade) for more.
248+ pub fn with_upgrades ( self ) -> upgrades:: UpgradeableConnection < T , B > {
249+ upgrades:: UpgradeableConnection { inner : Some ( self ) }
250+ }
251+ }
252+
243253impl < T , B > fmt:: Debug for Connection < T , B >
244254where
245- T : Read + Write + fmt:: Debug + Send + ' static ,
255+ T : Read + Write + fmt:: Debug + ' static ,
246256 B : Body + ' static ,
247257{
248258 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
@@ -252,27 +262,24 @@ where
252262
253263impl < T , B > Future for Connection < T , B >
254264where
255- T : Read + Write + Unpin + Send + ' static ,
265+ T : Read + Write + Unpin + ' static ,
256266 B : Body + ' static ,
257267 B :: Data : Send ,
258268 B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
259269{
260270 type Output = crate :: Result < ( ) > ;
261271
262272 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
263- match ready ! ( Pin :: new( self . inner. as_mut ( ) . unwrap ( ) ) . poll( cx) ) ? {
273+ match ready ! ( Pin :: new( & mut self . inner) . poll( cx) ) ? {
264274 proto:: Dispatched :: Shutdown => Poll :: Ready ( Ok ( ( ) ) ) ,
265- proto:: Dispatched :: Upgrade ( pending) => match self . inner . take ( ) {
266- Some ( h1) => {
267- let ( io, buf, _) = h1. into_inner ( ) ;
268- pending. fulfill ( Upgraded :: new ( io, buf) ) ;
269- Poll :: Ready ( Ok ( ( ) ) )
270- }
271- _ => {
272- drop ( pending) ;
273- unreachable ! ( "Upgraded twice" ) ;
274- }
275- } ,
275+ proto:: Dispatched :: Upgrade ( pending) => {
276+ // With no `Send` bound on `I`, we can't try to do
277+ // upgrades here. In case a user was trying to use
278+ // `upgrade` with this API, send a special
279+ // error letting them know about that.
280+ pending. manual ( ) ;
281+ Poll :: Ready ( Ok ( ( ) ) )
282+ }
276283 }
277284 }
278285}
@@ -474,7 +481,7 @@ impl Builder {
474481 io : T ,
475482 ) -> impl Future < Output = crate :: Result < ( SendRequest < B > , Connection < T , B > ) > >
476483 where
477- T : Read + Write + Unpin + Send + ' static ,
484+ T : Read + Write + Unpin + ' static ,
478485 B : Body + ' static ,
479486 B :: Data : Send ,
480487 B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
@@ -518,10 +525,53 @@ impl Builder {
518525 let cd = proto:: h1:: dispatch:: Client :: new ( rx) ;
519526 let proto = proto:: h1:: Dispatcher :: new ( cd, conn) ;
520527
521- Ok ( (
522- SendRequest { dispatch : tx } ,
523- Connection { inner : Some ( proto) } ,
524- ) )
528+ Ok ( ( SendRequest { dispatch : tx } , Connection { inner : proto } ) )
529+ }
530+ }
531+ }
532+
533+ mod upgrades {
534+ use crate :: upgrade:: Upgraded ;
535+
536+ use super :: * ;
537+
538+ // A future binding a connection with a Service with Upgrade support.
539+ //
540+ // This type is unnameable outside the crate.
541+ #[ must_use = "futures do nothing unless polled" ]
542+ #[ allow( missing_debug_implementations) ]
543+ pub struct UpgradeableConnection < T , B >
544+ where
545+ T : Read + Write + Unpin + Send + ' static ,
546+ B : Body + ' static ,
547+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
548+ {
549+ pub ( super ) inner : Option < Connection < T , B > > ,
550+ }
551+
552+ impl < I , B > Future for UpgradeableConnection < I , B >
553+ where
554+ I : Read + Write + Unpin + Send + ' static ,
555+ B : Body + ' static ,
556+ B :: Data : Send ,
557+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
558+ {
559+ type Output = crate :: Result < ( ) > ;
560+
561+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
562+ match ready ! ( Pin :: new( & mut self . inner. as_mut( ) . unwrap( ) . inner) . poll( cx) ) {
563+ Ok ( proto:: Dispatched :: Shutdown ) => Poll :: Ready ( Ok ( ( ) ) ) ,
564+ Ok ( proto:: Dispatched :: Upgrade ( pending) ) => {
565+ let Parts {
566+ io,
567+ read_buf,
568+ _inner,
569+ } = self . inner . take ( ) . unwrap ( ) . into_parts ( ) ;
570+ pending. fulfill ( Upgraded :: new ( io, read_buf) ) ;
571+ Poll :: Ready ( Ok ( ( ) ) )
572+ }
573+ Err ( e) => Poll :: Ready ( Err ( e) ) ,
574+ }
525575 }
526576 }
527577}
0 commit comments