Skip to content

Commit b2cc93d

Browse files
committed
http2: add a MinConcurrentConns field to Transport for improving throughput
1 parent 078779b commit b2cc93d

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

http2/client_conn_pool.go

+35-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"crypto/tls"
1111
"net/http"
1212
"sync"
13+
"math/rand"
14+
"time"
1315
)
1416

1517
// ClientConnPool manages a pool of HTTP/2 client connections.
@@ -84,27 +86,52 @@ func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMis
8486
}
8587
return cc, nil
8688
}
89+
8790
p.mu.Lock()
88-
for _, cc := range p.conns[addr] {
89-
if st := cc.idleState(); st.canTakeNewRequest {
90-
if p.shouldTraceGetConn(st) {
91-
traceGetConn(req, addr)
91+
defer p.mu.Unlock()
92+
93+
ccList := p.conns[addr]
94+
minCon := int(p.t.MinConcurrentConns)
95+
96+
if len(ccList) >= minCon {
97+
if minCon > 0 {
98+
rndSrc := rand.NewSource(time.Now().UnixNano())
99+
rnd := rand.New(rndSrc)
100+
startOffset := rnd.Intn(minCon)
101+
// find an available connection randomly among MinConcurrentConns connections for spreading streams
102+
for i := 0; i < minCon; i++ {
103+
idx := (i + startOffset) % minCon
104+
if p.takeNewRequest(req, addr, ccList[idx]) {
105+
return ccList[idx], nil
106+
}
107+
}
108+
}
109+
// find an available connection sequentially among the rests
110+
for _, cc := range ccList[minCon:] {
111+
if p.takeNewRequest(req, addr, cc) {
112+
return cc, nil
92113
}
93-
p.mu.Unlock()
94-
return cc, nil
95114
}
96115
}
97116
if !dialOnMiss {
98-
p.mu.Unlock()
99117
return nil, ErrNoCachedConn
100118
}
101119
traceGetConn(req, addr)
102120
call := p.getStartDialLocked(addr)
103-
p.mu.Unlock()
104121
<-call.done
105122
return call.res, call.err
106123
}
107124

125+
func (p *clientConnPool) takeNewRequest(req *http.Request, addr string, cc *ClientConn) bool {
126+
if st := cc.idleState(); st.canTakeNewRequest {
127+
if p.shouldTraceGetConn(st) {
128+
traceGetConn(req, addr)
129+
}
130+
return true
131+
}
132+
return false
133+
}
134+
108135
// dialCall is an in-flight Transport dial call to a host.
109136
type dialCall struct {
110137
p *clientConnPool

http2/transport.go

+7
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ type Transport struct {
108108
// waiting for their turn.
109109
StrictMaxConcurrentStreams bool
110110

111+
// MinConcurrentConns is the minimum number of TCP connections.
112+
// ClientConnPool tries to maintain MinConcurrentConns
113+
// TCP connections at least. If 0, ClientConnPool creates a
114+
// TCP connection only when needed. The default value is 0.
115+
// Increase this value if you need high throughput.
116+
MinConcurrentConns uint32
117+
111118
// t1, if non-nil, is the standard library Transport using
112119
// this transport. Its settings are used (but not its
113120
// RoundTrip method, etc).

http2/transport_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -4557,3 +4557,56 @@ func TestClientConnTooIdle(t *testing.T) {
45574557
}
45584558
}
45594559
}
4560+
4561+
func TestTransportMinConcurrentConns(t *testing.T) {
4562+
st := newServerTester(t,
4563+
func(w http.ResponseWriter, r *http.Request) {
4564+
io.WriteString(w, "test")
4565+
},
4566+
func(s *Server) {
4567+
s.MaxConcurrentStreams = 100
4568+
s.IdleTimeout = 30 * time.Second
4569+
},
4570+
optOnlyServer,
4571+
)
4572+
defer st.Close()
4573+
4574+
req, err := http.NewRequest("GET", st.ts.URL, nil)
4575+
if err != nil {
4576+
t.Fatalf("NewRequest: %v", err)
4577+
}
4578+
addr := authorityAddr(req.URL.Scheme, req.URL.Host)
4579+
4580+
MinConcurrentConns := 5
4581+
4582+
tr := &Transport{
4583+
TLSClientConfig: tlsConfigInsecure,
4584+
MinConcurrentConns: uint32(MinConcurrentConns),
4585+
}
4586+
defer tr.CloseIdleConnections()
4587+
4588+
// execute a dummy request
4589+
for i := 0; i < 10; i++ {
4590+
_, err := tr.RoundTrip(req)
4591+
if err != nil {
4592+
t.Fatalf("%v", err)
4593+
}
4594+
}
4595+
4596+
// check client connection pool
4597+
cp, ok := tr.connPool().(*clientConnPool)
4598+
if !ok {
4599+
t.Fatalf("Conn pool is %T; want *clientConnPool", tr.connPool())
4600+
}
4601+
4602+
cp.mu.Lock()
4603+
defer cp.mu.Unlock()
4604+
4605+
conns, ok := cp.conns[addr]
4606+
if !ok {
4607+
t.Fatalf("cannot find connections for %s", addr)
4608+
}
4609+
if len(conns) != MinConcurrentConns {
4610+
t.Fatalf("the number of connections should be %d, but got %d", MinConcurrentConns, len(conns))
4611+
}
4612+
}

0 commit comments

Comments
 (0)