Skip to content

Commit a364f44

Browse files
committed
Add support for request timeout propgation.
Adds a new field to the `Request` type which specifies a timeout (in nanoseconds) for the request. This is propagated on method dispatch as a context timeout. There was some discussion here on supporting a broader "metadata" field (similar to grpc) that can be used for other things, but we ended up with a dedicated field because it is lighter weight and expect it to be used pretty heavily as is.... metadata may be added in the future, but is not necessary for timeouts. Also discussed using a deadline vs a timeout in the request and decided to go with a timeout in order to deal with potential clock skew between the client and server. This also has the side-effect of eliminating the protocol/wire overhead from the request timeout. Signed-off-by: Brian Goff <[email protected]>
1 parent f51df44 commit a364f44

4 files changed

Lines changed: 61 additions & 5 deletions

File tree

client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"syscall"
27+
"time"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pkg/errors"
@@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
8687
cresp = &Response{}
8788
)
8889

90+
if dl, ok := ctx.Deadline(); ok {
91+
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
92+
}
93+
8994
if err := c.dispatch(ctx, creq, cresp); err != nil {
9095
return err
9196
}
@@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
104109
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
105110
errs := make(chan error, 1)
106111
call := &callRequest{
112+
ctx: ctx,
107113
req: req,
108114
resp: resp,
109115
errs: errs,

server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
414414
case request := <-requests:
415415
active++
416416
go func(id uint32) {
417+
ctx, cancel := getRequestContext(ctx, request.req)
418+
defer cancel()
419+
417420
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
418421
resp := &Response{
419422
Status: status.Proto(),
@@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
454457
}
455458
}
456459
}
460+
461+
var noopFunc = func() {}
462+
463+
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
464+
cancel = noopFunc
465+
if req.TimeoutNano == 0 {
466+
return ctx, cancel
467+
}
468+
469+
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
470+
return ctx, cancel
471+
}

server_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"testing"
27+
"time"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pkg/errors"
@@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
5758
}
5859

5960
type testPayload struct {
60-
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
61+
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
62+
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
6163
}
6264

6365
func (r *testPayload) Reset() { *r = testPayload{} }
@@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
6870
type testingServer struct{}
6971

7072
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
71-
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil
73+
tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
74+
if dl, ok := ctx.Deadline(); ok {
75+
tp.Deadline = dl.UnixNano()
76+
}
77+
return tp, nil
7278
}
7379

7480
// registerTestingService mocks more of what is generated code. Unlike grpc, we
@@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
376382
}
377383
}
378384

385+
func TestServerRequestTimeout(t *testing.T) {
386+
var (
387+
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
388+
server = mustServer(t)(NewServer())
389+
addr, listener = newTestListener(t)
390+
testImpl = &testingServer{}
391+
client, cleanup = newTestClient(t, addr)
392+
result testPayload
393+
)
394+
defer cancel()
395+
defer cleanup()
396+
defer listener.Close()
397+
398+
registerTestingService(server, testImpl)
399+
400+
go server.Serve(ctx, listener)
401+
defer server.Shutdown(ctx)
402+
403+
if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
404+
t.Fatalf("unexpected error making call: %v", err)
405+
}
406+
407+
dl, _ := ctx.Deadline()
408+
if result.Deadline != dl.UnixNano() {
409+
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
410+
}
411+
}
412+
379413
func BenchmarkRoundTrip(b *testing.B) {
380414
var (
381415
ctx = context.Background()

types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import (
2323
)
2424

2525
type Request struct {
26-
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27-
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28-
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
26+
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27+
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28+
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
29+
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
2930
}
3031

3132
func (r *Request) Reset() { *r = Request{} }

0 commit comments

Comments
 (0)