Skip to content

Commit 5c9811d

Browse files
committed
Cleanup dead v2 shim.
Signed-off-by: Lantao Liu <[email protected]>
1 parent a17c809 commit 5c9811d

3 files changed

Lines changed: 102 additions & 40 deletions

File tree

runtime/v2/binary.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type binary struct {
5252
rtTasks *runtime.TaskList
5353
}
5454

55-
func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
55+
func (b *binary) Start(ctx context.Context, onClose func()) (_ *shim, err error) {
5656
args := []string{"-id", b.bundle.ID}
5757
if logrus.GetLevel() == logrus.DebugLevel {
5858
args = append(args, "-debug")
@@ -96,7 +96,7 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
9696
if err != nil {
9797
return nil, err
9898
}
99-
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
99+
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
100100
return &shim{
101101
bundle: b.bundle,
102102
client: client,
@@ -147,9 +147,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
147147
if err := b.bundle.Delete(); err != nil {
148148
return nil, err
149149
}
150-
// remove self from the runtime task list
151-
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
152-
b.rtTasks.Delete(ctx, b.bundle.ID)
153150
return &runtime.Exit{
154151
Status: response.ExitStatus,
155152
Timestamp: response.ExitedAt,

runtime/v2/manager.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func (m *TaskManager) ID() string {
113113

114114
// Create a new task
115115
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
116+
ns, err := namespaces.NamespaceRequired(ctx)
117+
if err != nil {
118+
return nil, err
119+
}
116120
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
117121
if err != nil {
118122
return nil, err
@@ -123,7 +127,15 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
123127
}
124128
}()
125129
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
126-
shim, err := b.Start(ctx)
130+
shim, err := b.Start(ctx, func() {
131+
log.G(ctx).WithField("id", id).Info("shim disconnected")
132+
_, err := m.tasks.Get(ctx, id)
133+
if err != nil {
134+
// Task was never started or was already successfully deleted
135+
return
136+
}
137+
cleanupAfterDeadShim(context.Background(), id, ns, m.events, b)
138+
})
127139
if err != nil {
128140
return nil, err
129141
}
@@ -219,23 +231,27 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
219231
bundle.Delete()
220232
continue
221233
}
222-
shim, err := loadShim(ctx, bundle, m.events, m.tasks)
234+
container, err := m.container(ctx, id)
223235
if err != nil {
224-
log.G(ctx).WithError(err).Errorf("cleanup dead shim %s", id)
225-
container, err := m.container(ctx, id)
226-
if err != nil {
227-
log.G(ctx).WithError(err).Errorf("loading dead container %s", id)
228-
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
229-
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
230-
}
231-
bundle.Delete()
232-
continue
236+
log.G(ctx).WithError(err).Errorf("loading container %s", id)
237+
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
238+
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
233239
}
234-
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
235-
if _, err := binaryCall.Delete(ctx); err != nil {
236-
log.G(ctx).WithError(err).Errorf("binary call to delete for %s", id)
237-
continue
240+
bundle.Delete()
241+
continue
242+
}
243+
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
244+
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
245+
log.G(ctx).WithField("id", id).Info("shim disconnected")
246+
_, err := m.tasks.Get(ctx, id)
247+
if err != nil {
248+
// Task was never started or was already successfully deleted
249+
return
238250
}
251+
cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall)
252+
})
253+
if err != nil {
254+
cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall)
239255
continue
240256
}
241257
m.tasks.Add(ctx, shim)

runtime/v2/shim.go

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@ import (
2424
"path/filepath"
2525
"time"
2626

27+
eventstypes "github.com/containerd/containerd/api/events"
2728
"github.com/containerd/containerd/api/types"
2829
tasktypes "github.com/containerd/containerd/api/types/task"
2930
"github.com/containerd/containerd/errdefs"
3031
"github.com/containerd/containerd/events/exchange"
3132
"github.com/containerd/containerd/identifiers"
3233
"github.com/containerd/containerd/log"
34+
"github.com/containerd/containerd/namespaces"
3335
"github.com/containerd/containerd/runtime"
3436
client "github.com/containerd/containerd/runtime/v2/shim"
3537
"github.com/containerd/containerd/runtime/v2/task"
3638
"github.com/containerd/ttrpc"
3739
ptypes "github.com/gogo/protobuf/types"
3840
"github.com/pkg/errors"
41+
"github.com/sirupsen/logrus"
3942
)
4043

4144
func loadAddress(path string) (string, error) {
@@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) {
4649
return string(data), nil
4750
}
4851

49-
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) {
52+
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
5053
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
5154
if err != nil {
5255
return nil, err
@@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
5558
if err != nil {
5659
return nil, err
5760
}
61+
defer func() {
62+
if err != nil {
63+
conn.Close()
64+
}
65+
}()
5866
f, err := openShimLog(ctx, bundle)
5967
if err != nil {
6068
return nil, errors.Wrap(err, "open shim log pipe")
@@ -74,7 +82,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
7482
}
7583
}()
7684

77-
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
85+
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
86+
defer func() {
87+
if err != nil {
88+
client.Close()
89+
}
90+
}()
7891
s := &shim{
7992
client: client,
8093
task: task.NewTaskClient(client),
@@ -88,6 +101,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
88101
return s, nil
89102
}
90103

104+
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
105+
ctx = namespaces.WithNamespace(ctx, ns)
106+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
107+
defer cancel()
108+
109+
log.G(ctx).WithFields(logrus.Fields{
110+
"id": id,
111+
"namespace": ns,
112+
}).Warn("cleaning up after shim disconnected")
113+
response, err := binaryCall.Delete(ctx)
114+
if err != nil {
115+
log.G(ctx).WithError(err).WithFields(logrus.Fields{
116+
"id": id,
117+
"namespace": ns,
118+
}).Warn("failed to clean up after shim disconnected")
119+
}
120+
121+
var (
122+
pid uint32
123+
exitStatus uint32
124+
exitedAt time.Time
125+
)
126+
if response != nil {
127+
pid = response.Pid
128+
exitStatus = response.Status
129+
exitedAt = response.Timestamp
130+
} else {
131+
exitStatus = 255
132+
exitedAt = time.Now()
133+
}
134+
events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
135+
ContainerID: id,
136+
ID: id,
137+
Pid: pid,
138+
ExitStatus: exitStatus,
139+
ExitedAt: exitedAt,
140+
})
141+
142+
events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
143+
ContainerID: id,
144+
Pid: pid,
145+
ExitStatus: exitStatus,
146+
ExitedAt: exitedAt,
147+
})
148+
}
149+
91150
type shim struct {
92151
bundle *Bundle
93152
client *ttrpc.Client
@@ -119,19 +178,9 @@ func (s *shim) Shutdown(ctx context.Context) error {
119178
}
120179

121180
func (s *shim) waitShutdown(ctx context.Context) error {
122-
dead := make(chan struct{})
123-
go func() {
124-
if err := s.Shutdown(ctx); err != nil {
125-
log.G(ctx).WithError(err).Error("shim shutdown error")
126-
}
127-
close(dead)
128-
}()
129-
select {
130-
case <-time.After(3 * time.Second):
131-
return errors.New("failed to shutdown shim in time")
132-
case <-dead:
133-
return nil
134-
}
181+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
182+
defer cancel()
183+
return s.Shutdown(ctx)
135184
}
136185

137186
// ID of the shim/task
@@ -154,15 +203,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
154203
if err != nil {
155204
return nil, errdefs.FromGRPC(err)
156205
}
206+
// remove self from the runtime task list
207+
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
208+
s.rtTasks.Delete(ctx, s.ID())
157209
if err := s.waitShutdown(ctx); err != nil {
158-
return nil, err
210+
log.G(ctx).WithError(err).Error("failed to shutdown shim")
159211
}
160212
if err := s.bundle.Delete(); err != nil {
161-
return nil, err
213+
log.G(ctx).WithError(err).Error("failed to delete bundle")
162214
}
163-
// remove self from the runtime task list
164-
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
165-
s.rtTasks.Delete(ctx, s.ID())
166215
return &runtime.Exit{
167216
Status: response.ExitStatus,
168217
Timestamp: response.ExitedAt,

0 commit comments

Comments
 (0)