Skip to content

Commit d134fe7

Browse files
Merge pull request #41 from crosbymichael/interceptors
Add client and server unary interceptors
2 parents a5bd8ce + de8faac commit d134fe7

6 files changed

Lines changed: 241 additions & 19 deletions

File tree

client.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,36 +36,48 @@ import (
3636
// closed.
3737
var ErrClosed = errors.New("ttrpc: closed")
3838

39+
// Client for a ttrpc server
3940
type Client struct {
4041
codec codec
4142
conn net.Conn
4243
channel *channel
4344
calls chan *callRequest
4445

45-
closed chan struct{}
46-
closeOnce sync.Once
47-
closeFunc func()
48-
done chan struct{}
49-
err error
46+
closed chan struct{}
47+
closeOnce sync.Once
48+
closeFunc func()
49+
done chan struct{}
50+
err error
51+
interceptor UnaryClientInterceptor
5052
}
5153

54+
// ClientOpts configures a client
5255
type ClientOpts func(c *Client)
5356

57+
// WithOnClose sets the close func whenever the client's Close() method is called
5458
func WithOnClose(onClose func()) ClientOpts {
5559
return func(c *Client) {
5660
c.closeFunc = onClose
5761
}
5862
}
5963

64+
// WithUnaryClientInterceptor sets the provided client interceptor
65+
func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
66+
return func(c *Client) {
67+
c.interceptor = i
68+
}
69+
}
70+
6071
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
6172
c := &Client{
62-
codec: codec{},
63-
conn: conn,
64-
channel: newChannel(conn),
65-
calls: make(chan *callRequest),
66-
closed: make(chan struct{}),
67-
done: make(chan struct{}),
68-
closeFunc: func() {},
73+
codec: codec{},
74+
conn: conn,
75+
channel: newChannel(conn),
76+
calls: make(chan *callRequest),
77+
closed: make(chan struct{}),
78+
done: make(chan struct{}),
79+
closeFunc: func() {},
80+
interceptor: defaultClientInterceptor,
6981
}
7082

7183
for _, o := range opts {
@@ -107,7 +119,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
107119
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
108120
}
109121

110-
if err := c.dispatch(ctx, creq, cresp); err != nil {
122+
info := &UnaryClientInfo{
123+
FullMethod: fullPath(service, method),
124+
}
125+
if err := c.interceptor(ctx, creq, cresp, info, c.dispatch); err != nil {
111126
return err
112127
}
113128

config.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package ttrpc
1919
import "github.com/pkg/errors"
2020

2121
type serverConfig struct {
22-
handshaker Handshaker
22+
handshaker Handshaker
23+
interceptor UnaryServerInterceptor
2324
}
2425

26+
// ServerOpt for configuring a ttrpc server
2527
type ServerOpt func(*serverConfig) error
2628

2729
// WithServerHandshaker can be passed to NewServer to ensure that the
@@ -37,3 +39,14 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
3739
return nil
3840
}
3941
}
42+
43+
// WithUnaryServerInterceptor sets the provided interceptor on the server
44+
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
45+
return func(c *serverConfig) error {
46+
if c.interceptor != nil {
47+
return errors.New("only one interceptor allowed per server")
48+
}
49+
c.interceptor = i
50+
return nil
51+
}
52+
}

example/cmd/main.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
context "context"
21+
"encoding/json"
22+
"errors"
23+
"log"
24+
"net"
25+
"os"
26+
27+
ttrpc "github.com/containerd/ttrpc"
28+
"github.com/containerd/ttrpc/example"
29+
"github.com/gogo/protobuf/types"
30+
)
31+
32+
const socket = "example-ttrpc-server"
33+
34+
func main() {
35+
if err := handle(); err != nil {
36+
log.Fatal(err)
37+
}
38+
}
39+
40+
func handle() error {
41+
command := os.Args[1]
42+
switch command {
43+
case "server":
44+
return server()
45+
case "client":
46+
return client()
47+
default:
48+
return errors.New("invalid command")
49+
}
50+
}
51+
52+
func serverIntercept(ctx context.Context, um ttrpc.Unmarshaler, i *ttrpc.UnaryServerInfo, m ttrpc.Method) (interface{}, error) {
53+
log.Println("server interceptor")
54+
dumpMetadata(ctx)
55+
return m(ctx, um)
56+
}
57+
58+
func clientIntercept(ctx context.Context, req *ttrpc.Request, resp *ttrpc.Response, i *ttrpc.UnaryClientInfo, invoker ttrpc.Invoker) error {
59+
log.Println("client interceptor")
60+
dumpMetadata(ctx)
61+
return invoker(ctx, req, resp)
62+
}
63+
64+
func dumpMetadata(ctx context.Context) {
65+
md, ok := ttrpc.GetMetadata(ctx)
66+
if !ok {
67+
panic("no metadata")
68+
}
69+
if err := json.NewEncoder(os.Stdout).Encode(md); err != nil {
70+
panic(err)
71+
}
72+
}
73+
74+
func server() error {
75+
s, err := ttrpc.NewServer(
76+
ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()),
77+
ttrpc.WithUnaryServerInterceptor(serverIntercept),
78+
)
79+
if err != nil {
80+
return err
81+
}
82+
defer s.Close()
83+
example.RegisterExampleService(s, &exampleServer{})
84+
85+
l, err := net.Listen("unix", socket)
86+
if err != nil {
87+
return err
88+
}
89+
defer func() {
90+
l.Close()
91+
os.Remove(socket)
92+
}()
93+
return s.Serve(context.Background(), l)
94+
}
95+
96+
func client() error {
97+
conn, err := net.Dial("unix", socket)
98+
if err != nil {
99+
return err
100+
}
101+
defer conn.Close()
102+
103+
tc := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(clientIntercept))
104+
client := example.NewExampleClient(tc)
105+
106+
r := &example.Method1Request{
107+
Foo: os.Args[2],
108+
Bar: os.Args[3],
109+
}
110+
111+
ctx := context.Background()
112+
md := ttrpc.Metadata{}
113+
md.Set("name", "koye")
114+
ctx = ttrpc.WithMetadata(ctx, md)
115+
116+
resp, err := client.Method1(ctx, r)
117+
if err != nil {
118+
return err
119+
}
120+
return json.NewEncoder(os.Stdout).Encode(resp)
121+
}
122+
123+
type exampleServer struct {
124+
}
125+
126+
func (s *exampleServer) Method1(ctx context.Context, r *example.Method1Request) (*example.Method1Response, error) {
127+
return &example.Method1Response{
128+
Foo: r.Foo,
129+
Bar: r.Bar,
130+
}, nil
131+
}
132+
133+
func (s *exampleServer) Method2(ctx context.Context, r *example.Method1Request) (*types.Empty, error) {
134+
return &types.Empty{}, nil
135+
}

interceptor.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ttrpc
18+
19+
import "context"
20+
21+
// UnaryServerInfo provides information about the server request
22+
type UnaryServerInfo struct {
23+
FullMethod string
24+
}
25+
26+
// UnaryClientInfo provides information about the client request
27+
type UnaryClientInfo struct {
28+
FullMethod string
29+
}
30+
31+
// Unmarshaler contains the server request data and allows it to be unmarshaled
32+
// into a concrete type
33+
type Unmarshaler func(interface{}) error
34+
35+
// Invoker invokes the client's request and response from the ttrpc server
36+
type Invoker func(context.Context, *Request, *Response) error
37+
38+
// UnaryServerInterceptor specifies the interceptor function for server request/response
39+
type UnaryServerInterceptor func(context.Context, Unmarshaler, *UnaryServerInfo, Method) (interface{}, error)
40+
41+
// UnaryClientInterceptor specifies the interceptor function for client request/response
42+
type UnaryClientInterceptor func(context.Context, *Request, *Response, *UnaryClientInfo, Invoker) error
43+
44+
func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, info *UnaryServerInfo, method Method) (interface{}, error) {
45+
return method(ctx, unmarshal)
46+
}
47+
48+
func defaultClientInterceptor(ctx context.Context, req *Request, resp *Response, _ *UnaryClientInfo, invoker Invoker) error {
49+
return invoker(ctx, req, resp)
50+
}

server.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@ func NewServer(opts ...ServerOpt) (*Server, error) {
5353
return nil, err
5454
}
5555
}
56+
if config.interceptor == nil {
57+
config.interceptor = defaultServerInterceptor
58+
}
5659

5760
return &Server{
5861
config: config,
59-
services: newServiceSet(),
62+
services: newServiceSet(config.interceptor),
6063
done: make(chan struct{}),
6164
listeners: make(map[net.Listener]struct{}),
6265
connections: make(map[*serverConn]struct{}),

services.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ type ServiceDesc struct {
3737
}
3838

3939
type serviceSet struct {
40-
services map[string]ServiceDesc
40+
services map[string]ServiceDesc
41+
interceptor UnaryServerInterceptor
4142
}
4243

43-
func newServiceSet() *serviceSet {
44+
func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet {
4445
return &serviceSet{
45-
services: make(map[string]ServiceDesc),
46+
services: make(map[string]ServiceDesc),
47+
interceptor: interceptor,
4648
}
4749
}
4850

@@ -84,7 +86,11 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin
8486
return nil
8587
}
8688

87-
resp, err := method(ctx, unmarshal)
89+
info := &UnaryServerInfo{
90+
FullMethod: fullPath(serviceName, methodName),
91+
}
92+
93+
resp, err := s.interceptor(ctx, unmarshal, info, method)
8894
if err != nil {
8995
return nil, err
9096
}

0 commit comments

Comments
 (0)