Skip to content

Commit f02858b

Browse files
authored
Merge pull request #31 from cpuguy83/support_context_deadlines
Add support for request timeout propgation.
2 parents f51df44 + a364f44 commit f02858b

4 files changed

Lines changed: 61 additions & 5 deletions

File tree

client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"syscall"
27+
"time"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pkg/errors"
@@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
8687
cresp = &Response{}
8788
)
8889

90+
if dl, ok := ctx.Deadline(); ok {
91+
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
92+
}
93+
8994
if err := c.dispatch(ctx, creq, cresp); err != nil {
9095
return err
9196
}
@@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
104109
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
105110
errs := make(chan error, 1)
106111
call := &callRequest{
112+
ctx: ctx,
107113
req: req,
108114
resp: resp,
109115
errs: errs,

server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
414414
case request := <-requests:
415415
active++
416416
go func(id uint32) {
417+
ctx, cancel := getRequestContext(ctx, request.req)
418+
defer cancel()
419+
417420
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
418421
resp := &Response{
419422
Status: status.Proto(),
@@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
454457
}
455458
}
456459
}
460+
461+
var noopFunc = func() {}
462+
463+
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
464+
cancel = noopFunc
465+
if req.TimeoutNano == 0 {
466+
return ctx, cancel
467+
}
468+
469+
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
470+
return ctx, cancel
471+
}

server_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"testing"
27+
"time"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pkg/errors"
@@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
5758
}
5859

5960
type testPayload struct {
60-
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
61+
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
62+
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
6163
}
6264

6365
func (r *testPayload) Reset() { *r = testPayload{} }
@@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
6870
type testingServer struct{}
6971

7072
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
71-
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil
73+
tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
74+
if dl, ok := ctx.Deadline(); ok {
75+
tp.Deadline = dl.UnixNano()
76+
}
77+
return tp, nil
7278
}
7379

7480
// registerTestingService mocks more of what is generated code. Unlike grpc, we
@@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
376382
}
377383
}
378384

385+
func TestServerRequestTimeout(t *testing.T) {
386+
var (
387+
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
388+
server = mustServer(t)(NewServer())
389+
addr, listener = newTestListener(t)
390+
testImpl = &testingServer{}
391+
client, cleanup = newTestClient(t, addr)
392+
result testPayload
393+
)
394+
defer cancel()
395+
defer cleanup()
396+
defer listener.Close()
397+
398+
registerTestingService(server, testImpl)
399+
400+
go server.Serve(ctx, listener)
401+
defer server.Shutdown(ctx)
402+
403+
if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
404+
t.Fatalf("unexpected error making call: %v", err)
405+
}
406+
407+
dl, _ := ctx.Deadline()
408+
if result.Deadline != dl.UnixNano() {
409+
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
410+
}
411+
}
412+
379413
func BenchmarkRoundTrip(b *testing.B) {
380414
var (
381415
ctx = context.Background()

types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import (
2323
)
2424

2525
type Request struct {
26-
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27-
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28-
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
26+
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27+
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28+
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
29+
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
2930
}
3031

3132
func (r *Request) Reset() { *r = Request{} }

0 commit comments

Comments
 (0)