@@ -183,6 +183,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
183183
184184 host := node .Addr ()
185185 request := ctx .proxyRequest .Request ()
186+ request .CloseBodyStream ()
186187 rewriteHost := string (request .Host ())
187188 upstreamHost := ctx .GetUpstreamHostHandler ()
188189 if upstreamHost != nil {
@@ -205,6 +206,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
205206
206207 beginTime := time .Now ()
207208 response := fasthttp .AcquireResponse ()
209+ //var client *fasthttp.HostClient
208210 ctx .response .responseError = fasthttp_client .ProxyTimeout (scheme , rewriteHost , node , request , response , timeout )
209211
210212 agent := newRequestAgent (& ctx .proxyRequest , host , scheme , response .Header , beginTime , time .Now ())
@@ -230,40 +232,46 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
230232 // 流式传输,非200状态码不考虑流式传输
231233 ctx .response .Response .SetStatusCode (response .StatusCode ())
232234 ctx .SetLabel ("stream_running" , "true" )
233-
234235 ctx .response .Response .SetBodyStreamWriter (func (w * bufio.Writer ) {
236+ reader := response .BodyStream ()
235237 defer func () {
238+ response .SetConnectionClose ()
236239 ctx .SetLabel ("stream_running" , "false" )
237240 ctx .FastFinish ()
238241 fasthttp .ReleaseResponse (response )
239242 }()
240- reader := response . BodyStream ()
243+
241244 buffer := make ([]byte , 4096 ) // 4KB 缓冲区
242245 for {
243246 n , err := reader .Read (buffer )
244- if n > 0 {
245- chunk := buffer [:n ]
246- chunk , err = ctx .proxyRequest .StreamBodyHandles (ctx , chunk )
247- if err != nil {
248- log .Errorf ("exec stream func error: %v" , err )
249- }
250-
251- n , err = w .Write (chunk )
252- if err != nil {
253- log .Errorf ("stream write error: %v" , err )
254- break
255- }
256- ctx .Response ().SetBody (chunk )
257-
258- w .Flush () // 实时发送数据
259- }
260247 if err != nil {
261248 if err == io .EOF {
262249 break
263250 }
264251 log .Errorf ("stream read error: %v" , err )
265252 break
266253 }
254+ chunk := buffer [:n ]
255+ chunk , err = ctx .proxyRequest .StreamBodyHandles (ctx , chunk )
256+ if err != nil {
257+ log .Errorf ("exec stream func error: %v" , err )
258+ }
259+
260+ n , err = w .Write (chunk )
261+ if err != nil {
262+ log .Errorf ("stream write error: %v" , err )
263+ response .SetConnectionClose ()
264+ break
265+ }
266+ ctx .Response ().SetBody (chunk )
267+
268+ err = w .Flush () // 实时发送数据
269+ if err != nil {
270+ // 停止读取上游数据
271+ log .Errorf ("stream flush error: %v" , err )
272+ response .SetConnectionClose ()
273+ break
274+ }
267275 }
268276 ctx .proxyRequest .ProxyBodyFinish (ctx )
269277 })
0 commit comments