Skip to content

Commit ae87730

Browse files
committed
Improve shim shutdown logic
Shims no longer call `os.Exit` but close the context on shutdown so that events and other resources have hit the `defer`s. Signed-off-by: Michael Crosby <[email protected]>
1 parent 4ba756e commit ae87730

File tree

8 files changed

+88
-42
lines changed

8 files changed

+88
-42
lines changed

runtime/v2/example/example.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"os"
2424

2525
"github.com/containerd/containerd/errdefs"
26-
"github.com/containerd/containerd/events"
2726
"github.com/containerd/containerd/runtime/v2/shim"
2827
taskAPI "github.com/containerd/containerd/runtime/v2/task"
2928
ptypes "github.com/gogo/protobuf/types"
@@ -37,7 +36,7 @@ var (
3736
)
3837

3938
// New returns a new shim service
40-
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
39+
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
4140
return &service{}, nil
4241
}
4342

runtime/v2/runc/epoll.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ import (
2424

2525
"github.com/containerd/cgroups"
2626
eventstypes "github.com/containerd/containerd/api/events"
27-
"github.com/containerd/containerd/events"
2827
"github.com/containerd/containerd/runtime"
28+
"github.com/containerd/containerd/runtime/v2/shim"
2929
"github.com/sirupsen/logrus"
3030
"golang.org/x/sys/unix"
3131
)
3232

3333
// NewOOMEpoller returns an epoll implementation that listens to OOM events
3434
// from a container's cgroups.
35-
func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) {
35+
func NewOOMEpoller(publisher shim.Publisher) (*Epoller, error) {
3636
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
3737
if err != nil {
3838
return nil, err
@@ -49,7 +49,7 @@ type Epoller struct {
4949
mu sync.Mutex
5050

5151
fd int
52-
publisher events.Publisher
52+
publisher shim.Publisher
5353
set map[uintptr]*item
5454
}
5555

runtime/v2/runc/v1/service.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
eventstypes "github.com/containerd/containerd/api/events"
3434
"github.com/containerd/containerd/api/types/task"
3535
"github.com/containerd/containerd/errdefs"
36-
"github.com/containerd/containerd/events"
3736
"github.com/containerd/containerd/log"
3837
"github.com/containerd/containerd/mount"
3938
"github.com/containerd/containerd/namespaces"
@@ -58,28 +57,27 @@ var (
5857
)
5958

6059
// New returns a new shim service that can be used via GRPC
61-
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
60+
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
6261
ep, err := runc.NewOOMEpoller(publisher)
6362
if err != nil {
6463
return nil, err
6564
}
66-
ctx, cancel := context.WithCancel(ctx)
6765
go ep.Run(ctx)
6866
s := &service{
6967
id: id,
7068
context: ctx,
7169
events: make(chan interface{}, 128),
7270
ec: shim.Default.Subscribe(),
7371
ep: ep,
74-
cancel: cancel,
72+
cancel: shutdown,
7573
}
7674
go s.processExits()
7775
runcC.Monitor = shim.Default
7876
if err := s.initPlatform(); err != nil {
79-
cancel()
77+
shutdown()
8078
return nil, errors.Wrap(err, "failed to initialized platform behavior")
8179
}
82-
go s.forward(publisher)
80+
go s.forward(ctx, publisher)
8381
return s, nil
8482
}
8583

@@ -511,7 +509,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
511509

512510
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
513511
s.cancel()
514-
os.Exit(0)
512+
close(s.events)
515513
return empty, nil
516514
}
517515

@@ -619,15 +617,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
619617
return pids, nil
620618
}
621619

622-
func (s *service) forward(publisher events.Publisher) {
620+
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
621+
ns, _ := namespaces.Namespace(ctx)
622+
ctx = namespaces.WithNamespace(context.Background(), ns)
623623
for e := range s.events {
624-
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
624+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
625625
err := publisher.Publish(ctx, runc.GetTopic(e), e)
626626
cancel()
627627
if err != nil {
628628
logrus.WithError(err).Error("post event")
629629
}
630630
}
631+
publisher.Close()
631632
}
632633

633634
func (s *service) getContainer() (*runc.Container, error) {

runtime/v2/runc/v2/service.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
eventstypes "github.com/containerd/containerd/api/events"
3535
"github.com/containerd/containerd/api/types/task"
3636
"github.com/containerd/containerd/errdefs"
37-
"github.com/containerd/containerd/events"
3837
"github.com/containerd/containerd/log"
3938
"github.com/containerd/containerd/mount"
4039
"github.com/containerd/containerd/namespaces"
@@ -71,29 +70,28 @@ type spec struct {
7170
}
7271

7372
// New returns a new shim service that can be used via GRPC
74-
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
73+
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
7574
ep, err := runc.NewOOMEpoller(publisher)
7675
if err != nil {
7776
return nil, err
7877
}
79-
ctx, cancel := context.WithCancel(ctx)
8078
go ep.Run(ctx)
8179
s := &service{
8280
id: id,
8381
context: ctx,
8482
events: make(chan interface{}, 128),
8583
ec: shim.Default.Subscribe(),
8684
ep: ep,
87-
cancel: cancel,
85+
cancel: shutdown,
8886
containers: make(map[string]*runc.Container),
8987
}
9088
go s.processExits()
9189
runcC.Monitor = shim.Default
9290
if err := s.initPlatform(); err != nil {
93-
cancel()
91+
shutdown()
9492
return nil, errors.Wrap(err, "failed to initialized platform behavior")
9593
}
96-
go s.forward(publisher)
94+
go s.forward(ctx, publisher)
9795
return s, nil
9896
}
9997

@@ -570,7 +568,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
570568
return empty, nil
571569
}
572570
s.cancel()
573-
os.Exit(0)
571+
close(s.events)
574572
return empty, nil
575573
}
576574

@@ -689,15 +687,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
689687
return pids, nil
690688
}
691689

692-
func (s *service) forward(publisher events.Publisher) {
690+
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
691+
ns, _ := namespaces.Namespace(ctx)
692+
ctx = namespaces.WithNamespace(context.Background(), ns)
693693
for e := range s.events {
694-
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
694+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
695695
err := publisher.Publish(ctx, runc.GetTopic(e), e)
696696
cancel()
697697
if err != nil {
698698
logrus.WithError(err).Error("post event")
699699
}
700700
}
701+
publisher.Close()
701702
}
702703

703704
func (s *service) getContainer(id string) (*runc.Container, error) {

runtime/v2/runhcs/service.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
containerd_types "github.com/containerd/containerd/api/types"
4242
"github.com/containerd/containerd/api/types/task"
4343
"github.com/containerd/containerd/errdefs"
44-
"github.com/containerd/containerd/events"
4544
"github.com/containerd/containerd/log"
4645
"github.com/containerd/containerd/mount"
4746
"github.com/containerd/containerd/namespaces"
@@ -129,12 +128,13 @@ func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) {
129128
}
130129

131130
// New returns a new runhcs shim service that can be used via GRPC
132-
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
131+
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
133132
return &service{
134133
context: ctx,
135134
id: id,
136135
processes: make(map[string]*process),
137136
publisher: publisher,
137+
shutdown: shutdown,
138138
}, nil
139139
}
140140

@@ -159,7 +159,8 @@ type service struct {
159159
id string
160160
processes map[string]*process
161161

162-
publisher events.Publisher
162+
publisher shim.Publisher
163+
shutdown func()
163164
}
164165

165166
func (s *service) newRunhcs() *runhcs.Runhcs {
@@ -1068,7 +1069,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
10681069
if s.debugListener != nil {
10691070
s.debugListener.Close()
10701071
}
1072+
s.publisher.Close()
1073+
s.shutdown()
10711074

1072-
os.Exit(0)
10731075
return empty, nil
10741076
}

runtime/v2/shim/shim.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23+
"io"
2324
"net"
2425
"os"
2526
"runtime"
2627
"runtime/debug"
2728
"strings"
29+
"sync"
2830
"time"
2931

3032
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
@@ -46,8 +48,14 @@ type Client struct {
4648
signals chan os.Signal
4749
}
4850

51+
// Publisher for events
52+
type Publisher interface {
53+
events.Publisher
54+
io.Closer
55+
}
56+
4957
// Init func for the creation of a shim server
50-
type Init func(context.Context, string, events.Publisher) (Shim, error)
58+
type Init func(context.Context, string, Publisher, func()) (Shim, error)
5159

5260
// Shim server interface
5361
type Shim interface {
@@ -156,24 +164,28 @@ func run(id string, initFunc Init, config Config) error {
156164
return err
157165
}
158166
}
159-
160-
publisher := &remoteEventsPublisher{
161-
address: fmt.Sprintf("%s.ttrpc", addressFlag),
162-
}
163-
conn, err := connect(publisher.address, dialer)
167+
address := fmt.Sprintf("%s.ttrpc", addressFlag)
168+
conn, err := connect(address, dialer)
164169
if err != nil {
165170
return err
166171
}
167-
defer conn.Close()
172+
publisher := &remoteEventsPublisher{
173+
address: address,
174+
conn: conn,
175+
closed: make(chan struct{}),
176+
}
177+
defer publisher.Close()
178+
168179
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
169180
if namespaceFlag == "" {
170181
return fmt.Errorf("shim namespace cannot be empty")
171182
}
172183
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
173184
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
174185
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
186+
ctx, cancel := context.WithCancel(ctx)
175187

176-
service, err := initFunc(ctx, idFlag, publisher)
188+
service, err := initFunc(ctx, idFlag, publisher, cancel)
177189
if err != nil {
178190
return err
179191
}
@@ -183,7 +195,7 @@ func run(id string, initFunc Init, config Config) error {
183195
"pid": os.Getpid(),
184196
"namespace": namespaceFlag,
185197
})
186-
go handleSignals(logger, signals)
198+
go handleSignals(ctx, logger, signals)
187199
response, err := service.Cleanup(ctx)
188200
if err != nil {
189201
return err
@@ -210,7 +222,17 @@ func run(id string, initFunc Init, config Config) error {
210222
return err
211223
}
212224
client := NewShimClient(ctx, service, signals)
213-
return client.Serve()
225+
if err := client.Serve(); err != nil {
226+
if err != context.Canceled {
227+
return err
228+
}
229+
}
230+
select {
231+
case <-publisher.Done():
232+
return nil
233+
case <-time.After(5 * time.Second):
234+
return errors.New("publisher not closed")
235+
}
214236
}
215237
}
216238

@@ -254,7 +276,7 @@ func (s *Client) Serve() error {
254276
dumpStacks(logger)
255277
}
256278
}()
257-
return handleSignals(logger, s.signals)
279+
return handleSignals(s.context, logger, s.signals)
258280
}
259281

260282
// serve serves the ttrpc API over a unix socket at the provided path
@@ -291,7 +313,22 @@ func dumpStacks(logger *logrus.Entry) {
291313

292314
type remoteEventsPublisher struct {
293315
address string
316+
conn net.Conn
294317
client v1.EventsService
318+
closed chan struct{}
319+
closer sync.Once
320+
}
321+
322+
func (l *remoteEventsPublisher) Done() <-chan struct{} {
323+
return l.closed
324+
}
325+
326+
func (l *remoteEventsPublisher) Close() (err error) {
327+
l.closer.Do(func() {
328+
err = l.conn.Close()
329+
close(l.closed)
330+
})
331+
return err
295332
}
296333

297334
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {

runtime/v2/shim/shim_unix.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,14 @@ func serveListener(path string) (net.Listener, error) {
7171
return l, nil
7272
}
7373

74-
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
74+
func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
7575
logger.Info("starting signal loop")
7676

7777
for {
78-
for s := range signals {
78+
select {
79+
case <-ctx.Done():
80+
return ctx.Err()
81+
case s := <-signals:
7982
switch s {
8083
case unix.SIGCHLD:
8184
if err := Reap(); err != nil {

runtime/v2/shim/shim_windows.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,14 @@ func serveListener(path string) (net.Listener, error) {
104104
return l, nil
105105
}
106106

107-
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
107+
func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
108108
logger.Info("starting signal loop")
109109

110110
for {
111-
for s := range signals {
111+
select {
112+
case <-ctx.Done():
113+
return ctx.Err()
114+
case s := <-signals:
112115
switch s {
113116
case os.Interrupt:
114117
return nil

0 commit comments

Comments
 (0)