Skip to content

Commit bc94455

Browse files
authored
Merge pull request #3206 from Random-Liu/cleanup-after-deadshim-v2
Cleanup dead v2 shim.
2 parents cfbbda9 + 5c9811d commit bc94455

3 files changed

Lines changed: 102 additions & 40 deletions

File tree

runtime/v2/binary.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type binary struct {
5252
rtTasks *runtime.TaskList
5353
}
5454

55-
func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
55+
func (b *binary) Start(ctx context.Context, onClose func()) (_ *shim, err error) {
5656
args := []string{"-id", b.bundle.ID}
5757
if logrus.GetLevel() == logrus.DebugLevel {
5858
args = append(args, "-debug")
@@ -101,7 +101,7 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
101101
if err != nil {
102102
return nil, err
103103
}
104-
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
104+
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
105105
return &shim{
106106
bundle: b.bundle,
107107
client: client,
@@ -152,9 +152,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
152152
if err := b.bundle.Delete(); err != nil {
153153
return nil, err
154154
}
155-
// remove self from the runtime task list
156-
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
157-
b.rtTasks.Delete(ctx, b.bundle.ID)
158155
return &runtime.Exit{
159156
Status: response.ExitStatus,
160157
Timestamp: response.ExitedAt,

runtime/v2/manager.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func (m *TaskManager) ID() string {
113113

114114
// Create a new task
115115
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
116+
ns, err := namespaces.NamespaceRequired(ctx)
117+
if err != nil {
118+
return nil, err
119+
}
116120
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
117121
if err != nil {
118122
return nil, err
@@ -123,7 +127,15 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
123127
}
124128
}()
125129
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
126-
shim, err := b.Start(ctx)
130+
shim, err := b.Start(ctx, func() {
131+
log.G(ctx).WithField("id", id).Info("shim disconnected")
132+
_, err := m.tasks.Get(ctx, id)
133+
if err != nil {
134+
// Task was never started or was already successfully deleted
135+
return
136+
}
137+
cleanupAfterDeadShim(context.Background(), id, ns, m.events, b)
138+
})
127139
if err != nil {
128140
return nil, err
129141
}
@@ -219,23 +231,27 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
219231
bundle.Delete()
220232
continue
221233
}
222-
shim, err := loadShim(ctx, bundle, m.events, m.tasks)
234+
container, err := m.container(ctx, id)
223235
if err != nil {
224-
log.G(ctx).WithError(err).Errorf("cleanup dead shim %s", id)
225-
container, err := m.container(ctx, id)
226-
if err != nil {
227-
log.G(ctx).WithError(err).Errorf("loading dead container %s", id)
228-
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
229-
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
230-
}
231-
bundle.Delete()
232-
continue
236+
log.G(ctx).WithError(err).Errorf("loading container %s", id)
237+
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
238+
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
233239
}
234-
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
235-
if _, err := binaryCall.Delete(ctx); err != nil {
236-
log.G(ctx).WithError(err).Errorf("binary call to delete for %s", id)
237-
continue
240+
bundle.Delete()
241+
continue
242+
}
243+
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
244+
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
245+
log.G(ctx).WithField("id", id).Info("shim disconnected")
246+
_, err := m.tasks.Get(ctx, id)
247+
if err != nil {
248+
// Task was never started or was already successfully deleted
249+
return
238250
}
251+
cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall)
252+
})
253+
if err != nil {
254+
cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall)
239255
continue
240256
}
241257
m.tasks.Add(ctx, shim)

runtime/v2/shim.go

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@ import (
2424
"path/filepath"
2525
"time"
2626

27+
eventstypes "github.com/containerd/containerd/api/events"
2728
"github.com/containerd/containerd/api/types"
2829
tasktypes "github.com/containerd/containerd/api/types/task"
2930
"github.com/containerd/containerd/errdefs"
3031
"github.com/containerd/containerd/events/exchange"
3132
"github.com/containerd/containerd/identifiers"
3233
"github.com/containerd/containerd/log"
34+
"github.com/containerd/containerd/namespaces"
3335
"github.com/containerd/containerd/runtime"
3436
client "github.com/containerd/containerd/runtime/v2/shim"
3537
"github.com/containerd/containerd/runtime/v2/task"
3638
"github.com/containerd/ttrpc"
3739
ptypes "github.com/gogo/protobuf/types"
3840
"github.com/pkg/errors"
41+
"github.com/sirupsen/logrus"
3942
)
4043

4144
func loadAddress(path string) (string, error) {
@@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) {
4649
return string(data), nil
4750
}
4851

49-
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) {
52+
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
5053
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
5154
if err != nil {
5255
return nil, err
@@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
5558
if err != nil {
5659
return nil, err
5760
}
61+
defer func() {
62+
if err != nil {
63+
conn.Close()
64+
}
65+
}()
5866
f, err := openShimLog(ctx, bundle)
5967
if err != nil {
6068
return nil, errors.Wrap(err, "open shim log pipe")
@@ -79,7 +87,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
7987
}
8088
}()
8189

82-
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
90+
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
91+
defer func() {
92+
if err != nil {
93+
client.Close()
94+
}
95+
}()
8396
s := &shim{
8497
client: client,
8598
task: task.NewTaskClient(client),
@@ -93,6 +106,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
93106
return s, nil
94107
}
95108

109+
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
110+
ctx = namespaces.WithNamespace(ctx, ns)
111+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
112+
defer cancel()
113+
114+
log.G(ctx).WithFields(logrus.Fields{
115+
"id": id,
116+
"namespace": ns,
117+
}).Warn("cleaning up after shim disconnected")
118+
response, err := binaryCall.Delete(ctx)
119+
if err != nil {
120+
log.G(ctx).WithError(err).WithFields(logrus.Fields{
121+
"id": id,
122+
"namespace": ns,
123+
}).Warn("failed to clean up after shim disconnected")
124+
}
125+
126+
var (
127+
pid uint32
128+
exitStatus uint32
129+
exitedAt time.Time
130+
)
131+
if response != nil {
132+
pid = response.Pid
133+
exitStatus = response.Status
134+
exitedAt = response.Timestamp
135+
} else {
136+
exitStatus = 255
137+
exitedAt = time.Now()
138+
}
139+
events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
140+
ContainerID: id,
141+
ID: id,
142+
Pid: pid,
143+
ExitStatus: exitStatus,
144+
ExitedAt: exitedAt,
145+
})
146+
147+
events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
148+
ContainerID: id,
149+
Pid: pid,
150+
ExitStatus: exitStatus,
151+
ExitedAt: exitedAt,
152+
})
153+
}
154+
96155
type shim struct {
97156
bundle *Bundle
98157
client *ttrpc.Client
@@ -124,19 +183,9 @@ func (s *shim) Shutdown(ctx context.Context) error {
124183
}
125184

126185
func (s *shim) waitShutdown(ctx context.Context) error {
127-
dead := make(chan struct{})
128-
go func() {
129-
if err := s.Shutdown(ctx); err != nil {
130-
log.G(ctx).WithError(err).Error("shim shutdown error")
131-
}
132-
close(dead)
133-
}()
134-
select {
135-
case <-time.After(3 * time.Second):
136-
return errors.New("failed to shutdown shim in time")
137-
case <-dead:
138-
return nil
139-
}
186+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
187+
defer cancel()
188+
return s.Shutdown(ctx)
140189
}
141190

142191
// ID of the shim/task
@@ -159,15 +208,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
159208
if err != nil && !errdefs.IsNotFound(err) {
160209
return nil, errdefs.FromGRPC(err)
161210
}
211+
// remove self from the runtime task list
212+
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
213+
s.rtTasks.Delete(ctx, s.ID())
162214
if err := s.waitShutdown(ctx); err != nil {
163-
return nil, err
215+
log.G(ctx).WithError(err).Error("failed to shutdown shim")
164216
}
165217
if err := s.bundle.Delete(); err != nil {
166-
return nil, err
218+
log.G(ctx).WithError(err).Error("failed to delete bundle")
167219
}
168-
// remove self from the runtime task list
169-
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
170-
s.rtTasks.Delete(ctx, s.ID())
171220
return &runtime.Exit{
172221
Status: response.ExitStatus,
173222
Timestamp: response.ExitedAt,

0 commit comments

Comments
 (0)