Skip to content

Commit 3e2e778

Browse files
fuweidqiutongs
authored andcommitted
runtime/v2: manager supports server interceptor
Currently, the runc shimv2 commandline manager doesn't support ttrpc server's customized option, for example, the ttrpc server interceptor. This commit is to allow the task plugin can return the `UnaryServerInterceptor` option to the manager so that the task plugin can do enhancement before handling the incoming request, like API-level failpoint control. Signed-off-by: Wei Fu <[email protected]> (cherry picked from commit 822cc51) Signed-off-by: Qiutong Song <[email protected]>
1 parent cb935bf commit 3e2e778

7 files changed

Lines changed: 168 additions & 9 deletions

File tree

runtime/v2/shim/shim.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ type ttrpcService interface {
105105
RegisterTTRPC(*ttrpc.Server) error
106106
}
107107

108+
type ttrpcServerOptioner interface {
109+
ttrpcService
110+
111+
UnaryInterceptor() ttrpc.UnaryServerInterceptor
112+
}
113+
108114
type taskService struct {
109115
shimapi.TaskService
110116
}
@@ -366,6 +372,8 @@ func run(ctx context.Context, manager Manager, initFunc Init, name string, confi
366372
var (
367373
initialized = plugin.NewPluginSet()
368374
ttrpcServices = []ttrpcService{}
375+
376+
ttrpcUnaryInterceptors = []ttrpc.UnaryServerInterceptor{}
369377
)
370378
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
371379
for _, p := range plugins {
@@ -414,11 +422,17 @@ func run(ctx context.Context, manager Manager, initFunc Init, name string, confi
414422
if src, ok := instance.(ttrpcService); ok {
415423
logrus.WithField("id", id).Debug("registering ttrpc service")
416424
ttrpcServices = append(ttrpcServices, src)
425+
426+
}
427+
428+
if src, ok := instance.(ttrpcServerOptioner); ok {
429+
ttrpcUnaryInterceptors = append(ttrpcUnaryInterceptors, src.UnaryInterceptor())
417430
}
418431
}
419432

420-
server, err := newServer()
421-
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
433+
unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
434+
server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
435+
if err != nil {
422436
return fmt.Errorf("failed creating server: %w", err)
423437
}
424438

runtime/v2/shim/shim_darwin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package shim
1818

1919
import "github.com/containerd/ttrpc"
2020

21-
func newServer() (*ttrpc.Server, error) {
22-
return ttrpc.NewServer()
21+
func newServer(opts ...ttrpc.ServerOpt) (*ttrpc.Server, error) {
22+
return ttrpc.NewServer(opts...)
2323
}
2424

2525
func subreaper() error {

runtime/v2/shim/shim_freebsd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package shim
1818

1919
import "github.com/containerd/ttrpc"
2020

21-
func newServer() (*ttrpc.Server, error) {
22-
return ttrpc.NewServer()
21+
func newServer(opts ...ttrpc.ServerOpt) (*ttrpc.Server, error) {
22+
return ttrpc.NewServer(opts...)
2323
}
2424

2525
func subreaper() error {

runtime/v2/shim/shim_linux.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import (
2121
"github.com/containerd/ttrpc"
2222
)
2323

24-
func newServer() (*ttrpc.Server, error) {
25-
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
24+
func newServer(opts ...ttrpc.ServerOpt) (*ttrpc.Server, error) {
25+
opts = append(opts, ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
26+
return ttrpc.NewServer(opts...)
2627
}
2728

2829
func subreaper() error {

runtime/v2/shim/shim_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func setupSignals(config Config) (chan os.Signal, error) {
3131
return nil, errors.New("not supported")
3232
}
3333

34-
func newServer() (*ttrpc.Server, error) {
34+
func newServer(opts ...ttrpc.ServerOpt) (*ttrpc.Server, error) {
3535
return nil, errors.New("not supported")
3636
}
3737

runtime/v2/shim/util.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"github.com/containerd/containerd/namespaces"
31+
"github.com/containerd/ttrpc"
3132
"github.com/gogo/protobuf/proto"
3233
"github.com/gogo/protobuf/types"
3334
exec "golang.org/x/sys/execabs"
@@ -167,3 +168,28 @@ func ReadAddress(path string) (string, error) {
167168
}
168169
return string(data), nil
169170
}
171+
172+
// chainUnaryServerInterceptors creates a single ttrpc server interceptor from
173+
// a chain of many interceptors executed from first to last.
174+
func chainUnaryServerInterceptors(interceptors ...ttrpc.UnaryServerInterceptor) ttrpc.UnaryServerInterceptor {
175+
n := len(interceptors)
176+
177+
// force to use default interceptor in ttrpc
178+
if n == 0 {
179+
return nil
180+
}
181+
182+
return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
183+
currentMethod := method
184+
185+
for i := n - 1; i > 0; i-- {
186+
interceptor := interceptors[i]
187+
innerMethod := currentMethod
188+
189+
currentMethod = func(currentCtx context.Context, currentUnmarshal func(interface{}) error) (interface{}, error) {
190+
return interceptor(currentCtx, currentUnmarshal, info, innerMethod)
191+
}
192+
}
193+
return interceptors[0](ctx, unmarshal, info, currentMethod)
194+
}
195+
}

runtime/v2/shim/util_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package shim
18+
19+
import (
20+
"context"
21+
"path/filepath"
22+
"reflect"
23+
"testing"
24+
25+
"github.com/containerd/ttrpc"
26+
)
27+
28+
func TestChainUnaryServerInterceptors(t *testing.T) {
29+
methodInfo := &ttrpc.UnaryServerInfo{
30+
FullMethod: filepath.Join("/", t.Name(), "foo"),
31+
}
32+
33+
type callKey struct{}
34+
callValue := "init"
35+
callCtx := context.WithValue(context.Background(), callKey{}, callValue)
36+
37+
verifyCallCtxFn := func(ctx context.Context, key interface{}, expected interface{}) {
38+
got := ctx.Value(key)
39+
if !reflect.DeepEqual(expected, got) {
40+
t.Fatalf("[context(key:%s) expected %v, but got %v", key, expected, got)
41+
}
42+
}
43+
44+
verifyInfoFn := func(info *ttrpc.UnaryServerInfo) {
45+
if !reflect.DeepEqual(methodInfo, info) {
46+
t.Fatalf("[info] expected %+v, but got %+v", methodInfo, info)
47+
}
48+
}
49+
50+
origUnmarshaler := func(obj interface{}) error {
51+
v := obj.(*int64)
52+
*v *= 2
53+
return nil
54+
}
55+
56+
type firstKey struct{}
57+
firstValue := "from first"
58+
var firstUnmarshaler ttrpc.Unmarshaler
59+
first := func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
60+
verifyCallCtxFn(ctx, callKey{}, callValue)
61+
verifyInfoFn(info)
62+
63+
ctx = context.WithValue(ctx, firstKey{}, firstValue)
64+
65+
firstUnmarshaler = func(obj interface{}) error {
66+
if err := unmarshal(obj); err != nil {
67+
return err
68+
}
69+
70+
v := obj.(*int64)
71+
*v *= 2
72+
return nil
73+
}
74+
75+
return method(ctx, firstUnmarshaler)
76+
}
77+
78+
type secondKey struct{}
79+
secondValue := "from second"
80+
second := func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (interface{}, error) {
81+
verifyCallCtxFn(ctx, callKey{}, callValue)
82+
verifyCallCtxFn(ctx, firstKey{}, firstValue)
83+
verifyInfoFn(info)
84+
85+
v := int64(3) // should return 12
86+
if err := unmarshal(&v); err != nil {
87+
t.Fatalf("unexpected error %v", err)
88+
}
89+
if expected := int64(12); v != expected {
90+
t.Fatalf("expected int64(%v), but got %v", expected, v)
91+
}
92+
93+
ctx = context.WithValue(ctx, secondKey{}, secondValue)
94+
return method(ctx, unmarshal)
95+
}
96+
97+
methodFn := func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
98+
verifyCallCtxFn(ctx, callKey{}, callValue)
99+
verifyCallCtxFn(ctx, firstKey{}, firstValue)
100+
verifyCallCtxFn(ctx, secondKey{}, secondValue)
101+
102+
v := int64(2)
103+
if err := unmarshal(&v); err != nil {
104+
return nil, err
105+
}
106+
return v, nil
107+
}
108+
109+
interceptor := chainUnaryServerInterceptors(first, second)
110+
v, err := interceptor(callCtx, origUnmarshaler, methodInfo, methodFn)
111+
if err != nil {
112+
t.Fatalf("expected nil, but got %v", err)
113+
}
114+
115+
if expected := int64(8); v != expected {
116+
t.Fatalf("expected result is int64(%v), but got %v", expected, v)
117+
}
118+
}

0 commit comments

Comments
 (0)