@@ -52,7 +52,7 @@ impl HttpBody for Body {
5252fn main ( ) {
5353 pretty_env_logger:: init ( ) ;
5454
55- let server = thread:: spawn ( move || {
55+ let server_http2 = thread:: spawn ( move || {
5656 // Configure a runtime for the server that runs everything on the current thread
5757 let rt = tokio:: runtime:: Builder :: new_current_thread ( )
5858 . enable_all ( )
@@ -61,10 +61,10 @@ fn main() {
6161
6262 // Combine it with a `LocalSet, which means it can spawn !Send futures...
6363 let local = tokio:: task:: LocalSet :: new ( ) ;
64- local. block_on ( & rt, server ( ) ) . unwrap ( ) ;
64+ local. block_on ( & rt, http2_server ( ) ) . unwrap ( ) ;
6565 } ) ;
6666
67- let client = thread:: spawn ( move || {
67+ let client_http2 = thread:: spawn ( move || {
6868 // Configure a runtime for the client that runs everything on the current thread
6969 let rt = tokio:: runtime:: Builder :: new_current_thread ( )
7070 . enable_all ( )
@@ -76,16 +76,137 @@ fn main() {
7676 local
7777 . block_on (
7878 & rt,
79- client ( "http://localhost:3000" . parse :: < hyper:: Uri > ( ) . unwrap ( ) ) ,
79+ http2_client ( "http://localhost:3000" . parse :: < hyper:: Uri > ( ) . unwrap ( ) ) ,
8080 )
8181 . unwrap ( ) ;
8282 } ) ;
8383
84- server. join ( ) . unwrap ( ) ;
85- client. join ( ) . unwrap ( ) ;
84+ let server_http1 = thread:: spawn ( move || {
85+ // Configure a runtime for the server that runs everything on the current thread
86+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
87+ . enable_all ( )
88+ . build ( )
89+ . expect ( "build runtime" ) ;
90+
91+ // Combine it with a `LocalSet, which means it can spawn !Send futures...
92+ let local = tokio:: task:: LocalSet :: new ( ) ;
93+ local. block_on ( & rt, http1_server ( ) ) . unwrap ( ) ;
94+ } ) ;
95+
96+ let client_http1 = thread:: spawn ( move || {
97+ // Configure a runtime for the client that runs everything on the current thread
98+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
99+ . enable_all ( )
100+ . build ( )
101+ . expect ( "build runtime" ) ;
102+
103+ // Combine it with a `LocalSet, which means it can spawn !Send futures...
104+ let local = tokio:: task:: LocalSet :: new ( ) ;
105+ local
106+ . block_on (
107+ & rt,
108+ http1_client ( "http://localhost:3001" . parse :: < hyper:: Uri > ( ) . unwrap ( ) ) ,
109+ )
110+ . unwrap ( ) ;
111+ } ) ;
112+
113+ server_http2. join ( ) . unwrap ( ) ;
114+ client_http2. join ( ) . unwrap ( ) ;
115+
116+ server_http1. join ( ) . unwrap ( ) ;
117+ client_http1. join ( ) . unwrap ( ) ;
118+ }
119+
120+ async fn http1_server ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
121+ let addr = SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , 3001 ) ) ;
122+
123+ let listener = TcpListener :: bind ( addr) . await ?;
124+
125+ // For each connection, clone the counter to use in our service...
126+ let counter = Rc :: new ( Cell :: new ( 0 ) ) ;
127+
128+ loop {
129+ let ( stream, _) = listener. accept ( ) . await ?;
130+
131+ let io = TokioIo :: new ( stream) ;
132+
133+ let cnt = counter. clone ( ) ;
134+
135+ let service = service_fn ( move |_| {
136+ let prev = cnt. get ( ) ;
137+ cnt. set ( prev + 1 ) ;
138+ let value = cnt. get ( ) ;
139+ async move { Ok :: < _ , Error > ( Response :: new ( Body :: from ( format ! ( "Request #{}" , value) ) ) ) }
140+ } ) ;
141+
142+ tokio:: task:: spawn_local ( async move {
143+ if let Err ( err) = hyper:: server:: conn:: http1:: Builder :: new ( )
144+ . serve_connection ( io, service)
145+ . await
146+ {
147+ println ! ( "Error serving connection: {:?}" , err) ;
148+ }
149+ } ) ;
150+ }
151+ }
152+
153+ async fn http1_client ( url : hyper:: Uri ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
154+ let host = url. host ( ) . expect ( "uri has no host" ) ;
155+ let port = url. port_u16 ( ) . unwrap_or ( 80 ) ;
156+ let addr = format ! ( "{}:{}" , host, port) ;
157+ let stream = TcpStream :: connect ( addr) . await ?;
158+
159+ let io = TokioIo :: new ( stream) ;
160+
161+ let ( mut sender, conn) = hyper:: client:: conn:: http1:: handshake ( io) . await ?;
162+
163+ tokio:: task:: spawn_local ( async move {
164+ if let Err ( err) = conn. await {
165+ let mut stdout = io:: stdout ( ) ;
166+ stdout
167+ . write_all ( format ! ( "Connection failed: {:?}" , err) . as_bytes ( ) )
168+ . await
169+ . unwrap ( ) ;
170+ stdout. flush ( ) . await . unwrap ( ) ;
171+ }
172+ } ) ;
173+
174+ let authority = url. authority ( ) . unwrap ( ) . clone ( ) ;
175+
176+ // Make 4 requests
177+ for _ in 0 ..4 {
178+ let req = Request :: builder ( )
179+ . uri ( url. clone ( ) )
180+ . header ( hyper:: header:: HOST , authority. as_str ( ) )
181+ . body ( Body :: from ( "test" . to_string ( ) ) ) ?;
182+
183+ let mut res = sender. send_request ( req) . await ?;
184+
185+ let mut stdout = io:: stdout ( ) ;
186+ stdout
187+ . write_all ( format ! ( "Response: {}\n " , res. status( ) ) . as_bytes ( ) )
188+ . await
189+ . unwrap ( ) ;
190+ stdout
191+ . write_all ( format ! ( "Headers: {:#?}\n " , res. headers( ) ) . as_bytes ( ) )
192+ . await
193+ . unwrap ( ) ;
194+ stdout. flush ( ) . await . unwrap ( ) ;
195+
196+ // Print the response body
197+ while let Some ( next) = res. frame ( ) . await {
198+ let frame = next?;
199+ if let Some ( chunk) = frame. data_ref ( ) {
200+ stdout. write_all ( & chunk) . await . unwrap ( ) ;
201+ }
202+ }
203+ stdout. write_all ( b"\n -----------------\n " ) . await . unwrap ( ) ;
204+ stdout. flush ( ) . await . unwrap ( ) ;
205+ }
206+ Ok ( ( ) )
86207}
87208
88- async fn server ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
209+ async fn http2_server ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
89210 let mut stdout = io:: stdout ( ) ;
90211
91212 let addr: SocketAddr = ( [ 127 , 0 , 0 , 1 ] , 3000 ) . into ( ) ;
@@ -102,7 +223,7 @@ async fn server() -> Result<(), Box<dyn std::error::Error>> {
102223
103224 loop {
104225 let ( stream, _) = listener. accept ( ) . await ?;
105- let io = TokioIo :: new ( stream) ;
226+ let io = IOTypeNotSend :: new ( TokioIo :: new ( stream) ) ;
106227
107228 // For each connection, clone the counter to use in our service...
108229 let cnt = counter. clone ( ) ;
@@ -130,55 +251,7 @@ async fn server() -> Result<(), Box<dyn std::error::Error>> {
130251 }
131252}
132253
133- struct IOTypeNotSend {
134- _marker : PhantomData < * const ( ) > ,
135- stream : TokioIo < TcpStream > ,
136- }
137-
138- impl IOTypeNotSend {
139- fn new ( stream : TokioIo < TcpStream > ) -> Self {
140- Self {
141- _marker : PhantomData ,
142- stream,
143- }
144- }
145- }
146-
147- impl hyper:: rt:: Write for IOTypeNotSend {
148- fn poll_write (
149- mut self : Pin < & mut Self > ,
150- cx : & mut Context < ' _ > ,
151- buf : & [ u8 ] ,
152- ) -> Poll < Result < usize , std:: io:: Error > > {
153- Pin :: new ( & mut self . stream ) . poll_write ( cx, buf)
154- }
155-
156- fn poll_flush (
157- mut self : Pin < & mut Self > ,
158- cx : & mut Context < ' _ > ,
159- ) -> Poll < Result < ( ) , std:: io:: Error > > {
160- Pin :: new ( & mut self . stream ) . poll_flush ( cx)
161- }
162-
163- fn poll_shutdown (
164- mut self : Pin < & mut Self > ,
165- cx : & mut Context < ' _ > ,
166- ) -> Poll < Result < ( ) , std:: io:: Error > > {
167- Pin :: new ( & mut self . stream ) . poll_shutdown ( cx)
168- }
169- }
170-
171- impl hyper:: rt:: Read for IOTypeNotSend {
172- fn poll_read (
173- mut self : Pin < & mut Self > ,
174- cx : & mut Context < ' _ > ,
175- buf : hyper:: rt:: ReadBufCursor < ' _ > ,
176- ) -> Poll < std:: io:: Result < ( ) > > {
177- Pin :: new ( & mut self . stream ) . poll_read ( cx, buf)
178- }
179- }
180-
181- async fn client ( url : hyper:: Uri ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
254+ async fn http2_client ( url : hyper:: Uri ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
182255 let host = url. host ( ) . expect ( "uri has no host" ) ;
183256 let port = url. port_u16 ( ) . unwrap_or ( 80 ) ;
184257 let addr = format ! ( "{}:{}" , host, port) ;
@@ -250,3 +323,51 @@ where
250323 tokio:: task:: spawn_local ( fut) ;
251324 }
252325}
326+
327+ struct IOTypeNotSend {
328+ _marker : PhantomData < * const ( ) > ,
329+ stream : TokioIo < TcpStream > ,
330+ }
331+
332+ impl IOTypeNotSend {
333+ fn new ( stream : TokioIo < TcpStream > ) -> Self {
334+ Self {
335+ _marker : PhantomData ,
336+ stream,
337+ }
338+ }
339+ }
340+
341+ impl hyper:: rt:: Write for IOTypeNotSend {
342+ fn poll_write (
343+ mut self : Pin < & mut Self > ,
344+ cx : & mut Context < ' _ > ,
345+ buf : & [ u8 ] ,
346+ ) -> Poll < Result < usize , std:: io:: Error > > {
347+ Pin :: new ( & mut self . stream ) . poll_write ( cx, buf)
348+ }
349+
350+ fn poll_flush (
351+ mut self : Pin < & mut Self > ,
352+ cx : & mut Context < ' _ > ,
353+ ) -> Poll < Result < ( ) , std:: io:: Error > > {
354+ Pin :: new ( & mut self . stream ) . poll_flush ( cx)
355+ }
356+
357+ fn poll_shutdown (
358+ mut self : Pin < & mut Self > ,
359+ cx : & mut Context < ' _ > ,
360+ ) -> Poll < Result < ( ) , std:: io:: Error > > {
361+ Pin :: new ( & mut self . stream ) . poll_shutdown ( cx)
362+ }
363+ }
364+
365+ impl hyper:: rt:: Read for IOTypeNotSend {
366+ fn poll_read (
367+ mut self : Pin < & mut Self > ,
368+ cx : & mut Context < ' _ > ,
369+ buf : hyper:: rt:: ReadBufCursor < ' _ > ,
370+ ) -> Poll < std:: io:: Result < ( ) > > {
371+ Pin :: new ( & mut self . stream ) . poll_read ( cx, buf)
372+ }
373+ }
0 commit comments