Skip to content

Commit d30d897

Browse files
committed
Cleanup v2 shim
Signed-off-by: Maksym Pavlenko <[email protected]>
1 parent 4282ec1 commit d30d897

8 files changed

Lines changed: 107 additions & 67 deletions

File tree

runtime/runtime.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,5 @@ type PlatformRuntime interface {
7272
// Add adds a task into runtime.
7373
Add(ctx context.Context, task Task) error
7474
// Delete remove a task.
75-
Delete(ctx context.Context, taskID string)
75+
Delete(ctx context.Context, taskID string) (*Exit, error)
7676
}

runtime/task.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ type Process interface {
4747
Start(ctx context.Context) error
4848
// Wait for the process to exit
4949
Wait(ctx context.Context) (*Exit, error)
50+
}
51+
52+
// ExecProcess is a process spawned in container via Task.Exec call.
53+
// The only difference from a regular `Process` is that exec process can delete self,
54+
// while task process requires slightly more complex logic and needs to be deleted through the task manager.
55+
type ExecProcess interface {
56+
Process
57+
5058
// Delete deletes the process
5159
Delete(ctx context.Context) (*Exit, error)
5260
}
@@ -56,23 +64,23 @@ type Task interface {
5664
Process
5765

5866
// PID of the process
59-
PID() uint32
67+
PID(ctx context.Context) (uint32, error)
6068
// Namespace that the task exists in
6169
Namespace() string
6270
// Pause pauses the container process
6371
Pause(ctx context.Context) error
6472
// Resume unpauses the container process
6573
Resume(ctx context.Context) error
6674
// Exec adds a process into the container
67-
Exec(ctx context.Context, id string, opts ExecOpts) (Process, error)
75+
Exec(ctx context.Context, id string, opts ExecOpts) (ExecProcess, error)
6876
// Pids returns all pids
6977
Pids(ctx context.Context) ([]ProcessInfo, error)
7078
// Checkpoint checkpoints a container to an image with live system data
7179
Checkpoint(ctx context.Context, path string, opts *types.Any) error
7280
// Update sets the provided resources to a running task
7381
Update(ctx context.Context, resources *types.Any, annotations map[string]string) error
7482
// Process returns a process within the task for the provided id
75-
Process(ctx context.Context, id string) (Process, error)
83+
Process(ctx context.Context, id string) (ExecProcess, error)
7684
// Stats returns runtime specific metrics for a task
7785
Stats(ctx context.Context) (*types.Any, error)
7886
}

runtime/v1/linux/runtime.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ import (
4343
"github.com/containerd/containerd/runtime"
4444
"github.com/containerd/containerd/runtime/linux/runctypes"
4545
v1 "github.com/containerd/containerd/runtime/v1"
46-
shim "github.com/containerd/containerd/runtime/v1/shim/v1"
47-
runc "github.com/containerd/go-runc"
46+
"github.com/containerd/containerd/runtime/v1/shim/v1"
47+
"github.com/containerd/go-runc"
4848
"github.com/containerd/typeurl"
4949
ptypes "github.com/gogo/protobuf/types"
5050
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -315,8 +315,20 @@ func (r *Runtime) Add(ctx context.Context, task runtime.Task) error {
315315
}
316316

317317
// Delete a runtime task
318-
func (r *Runtime) Delete(ctx context.Context, id string) {
318+
func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
319+
task, err := r.tasks.Get(ctx, id)
320+
if err != nil {
321+
return nil, err
322+
}
323+
324+
s := task.(*Task)
325+
exit, err := s.Delete(ctx)
326+
if err != nil {
327+
return nil, err
328+
}
329+
319330
r.tasks.Delete(ctx, id)
331+
return exit, nil
320332
}
321333

322334
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {

runtime/v1/linux/task.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func (t *Task) Namespace() string {
8585
}
8686

8787
// PID of the task
88-
func (t *Task) PID() uint32 {
89-
return uint32(t.pid)
88+
func (t *Task) PID(_ctx context.Context) (uint32, error) {
89+
return uint32(t.pid), nil
9090
}
9191

9292
// Delete the task and return the exit status
@@ -226,7 +226,7 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
226226
}
227227

228228
// Exec creates a new process inside the task
229-
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
229+
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
230230
if err := identifiers.Validate(id); err != nil {
231231
return nil, errors.Wrapf(err, "invalid exec id")
232232
}
@@ -316,7 +316,7 @@ func (t *Task) Update(ctx context.Context, resources *types.Any, _ map[string]st
316316
}
317317

318318
// Process returns a specific process inside the task by the process id
319-
func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
319+
func (t *Task) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
320320
p := &Process{
321321
id: id,
322322
t: t,

runtime/v2/binary.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
gruntime "runtime"
2525
"strings"
2626

27-
"github.com/containerd/containerd/events/exchange"
2827
"github.com/containerd/containerd/log"
2928
"github.com/containerd/containerd/namespaces"
3029
"github.com/containerd/containerd/runtime"
@@ -36,14 +35,12 @@ import (
3635
"github.com/sirupsen/logrus"
3736
)
3837

39-
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
38+
func shimBinary(bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string) *binary {
4039
return &binary{
4140
bundle: bundle,
4241
runtime: runtime,
4342
containerdAddress: containerdAddress,
4443
containerdTTRPCAddress: containerdTTRPCAddress,
45-
events: events,
46-
rtTasks: rt,
4744
}
4845
}
4946

@@ -52,8 +49,6 @@ type binary struct {
5249
containerdAddress string
5350
containerdTTRPCAddress string
5451
bundle *Bundle
55-
events *exchange.Exchange
56-
rtTasks *runtime.TaskList
5752
}
5853

5954
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
@@ -123,11 +118,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
123118
}
124119
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
125120
return &shim{
126-
bundle: b.bundle,
127-
client: client,
128-
task: task.NewTaskClient(client),
129-
events: b.events,
130-
rtTasks: b.rtTasks,
121+
bundle: b.bundle,
122+
client: client,
123+
task: task.NewTaskClient(client),
131124
}, nil
132125
}
133126

runtime/v2/manager.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
162162
topts = opts.RuntimeOptions
163163
}
164164

165-
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
165+
b := shimBinary(bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress)
166166
shim, err := b.Start(ctx, topts, func() {
167167
log.G(ctx).WithField("id", id).Info("shim disconnected")
168168

@@ -185,7 +185,7 @@ func (m *TaskManager) deleteShim(shim *shim) {
185185
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
186186
defer cancel()
187187

188-
_, errShim := shim.Delete(dctx)
188+
_, errShim := shim.delete(dctx, m.tasks.Delete)
189189
if errShim != nil {
190190
if errdefs.IsDeadlineExceeded(errShim) {
191191
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
@@ -207,8 +207,19 @@ func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
207207
}
208208

209209
// Delete a runtime task
210-
func (m *TaskManager) Delete(ctx context.Context, id string) {
211-
m.tasks.Delete(ctx, id)
210+
func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
211+
task, err := m.tasks.Get(ctx, id)
212+
if err != nil {
213+
return nil, err
214+
}
215+
216+
shim := task.(*shim)
217+
exit, err := shim.delete(ctx, m.tasks.Delete)
218+
if err != nil {
219+
return nil, err
220+
}
221+
222+
return exit, err
212223
}
213224

214225
// Tasks lists all tasks
@@ -287,8 +298,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
287298
bundle.Delete()
288299
continue
289300
}
290-
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
291-
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
301+
binaryCall := shimBinary(bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress)
302+
shim, err := loadShim(ctx, bundle, func() {
292303
log.G(ctx).WithField("id", id).Info("shim disconnected")
293304

294305
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)

runtime/v2/shim.go

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) {
6262
return string(data), nil
6363
}
6464

65-
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
65+
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) {
6666
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
6767
if err != nil {
6868
return nil, err
@@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
117117
}
118118
}()
119119
s := &shim{
120-
client: client,
121-
task: task.NewTaskClient(client),
122-
bundle: bundle,
123-
events: events,
124-
rtTasks: rt,
120+
client: client,
121+
task: task.NewTaskClient(client),
122+
bundle: bundle,
125123
}
126124
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
127125
defer cancel()
128-
if err := s.Connect(ctx); err != nil {
126+
127+
// Check connectivity
128+
if _, err := s.PID(ctx); err != nil {
129129
return nil, err
130130
}
131131
return s, nil
@@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
186186
var _ runtime.Task = &shim{}
187187

188188
type shim struct {
189-
bundle *Bundle
190-
client *ttrpc.Client
191-
task task.TaskService
192-
taskPid int
193-
events *exchange.Exchange
194-
rtTasks *runtime.TaskList
195-
}
196-
197-
func (s *shim) Connect(ctx context.Context) error {
198-
response, err := s.task.Connect(ctx, &task.ConnectRequest{
199-
ID: s.ID(),
200-
})
201-
if err != nil {
202-
return err
203-
}
204-
s.taskPid = int(response.TaskPid)
205-
return nil
189+
bundle *Bundle
190+
client *ttrpc.Client
191+
task task.TaskService
206192
}
207193

208194
func (s *shim) Shutdown(ctx context.Context) error {
@@ -227,8 +213,15 @@ func (s *shim) ID() string {
227213
}
228214

229215
// PID of the task
230-
func (s *shim) PID() uint32 {
231-
return uint32(s.taskPid)
216+
func (s *shim) PID(ctx context.Context) (uint32, error) {
217+
response, err := s.task.Connect(ctx, &task.ConnectRequest{
218+
ID: s.ID(),
219+
})
220+
if err != nil {
221+
return 0, errdefs.FromGRPC(err)
222+
}
223+
224+
return response.TaskPid, nil
232225
}
233226

234227
func (s *shim) Namespace() string {
@@ -239,7 +232,7 @@ func (s *shim) Close() error {
239232
return s.client.Close()
240233
}
241234

242-
func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
235+
func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
243236
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
244237
ID: s.ID(),
245238
})
@@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
264257
// So we should remove the record and prevent duplicate events from
265258
// ttrpc-callback-on-close.
266259
if shimErr == nil {
267-
s.rtTasks.Delete(ctx, s.ID())
260+
removeTask(ctx, s.ID())
268261
}
269262

270263
if err := s.waitShutdown(ctx); err != nil {
@@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
275268

276269
// remove self from the runtime task list
277270
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
278-
s.rtTasks.Delete(ctx, s.ID())
271+
removeTask(ctx, s.ID())
279272
if err := s.bundle.Delete(); err != nil {
280273
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
281274
}
@@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
311304
Options: m.Options,
312305
})
313306
}
314-
response, err := s.task.Create(ctx, request)
307+
308+
_, err := s.task.Create(ctx, request)
315309
if err != nil {
316310
return nil, errdefs.FromGRPC(err)
317311
}
318-
s.taskPid = int(response.Pid)
312+
319313
return s, nil
320314
}
321315

@@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error {
338332
}
339333

340334
func (s *shim) Start(ctx context.Context) error {
341-
response, err := s.task.Start(ctx, &task.StartRequest{
335+
_, err := s.task.Start(ctx, &task.StartRequest{
342336
ID: s.ID(),
343337
})
344338
if err != nil {
345339
return errdefs.FromGRPC(err)
346340
}
347-
s.taskPid = int(response.Pid)
348341
return nil
349342
}
350343

@@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
359352
return nil
360353
}
361354

362-
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
355+
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
363356
if err := identifiers.Validate(id); err != nil {
364357
return nil, errors.Wrapf(err, "invalid exec id %s", id)
365358
}
@@ -422,14 +415,18 @@ func (s *shim) CloseIO(ctx context.Context) error {
422415
}
423416

424417
func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
418+
taskPid, err := s.PID(ctx)
419+
if err != nil {
420+
return nil, err
421+
}
425422
response, err := s.task.Wait(ctx, &task.WaitRequest{
426423
ID: s.ID(),
427424
})
428425
if err != nil {
429426
return nil, errdefs.FromGRPC(err)
430427
}
431428
return &runtime.Exit{
432-
Pid: uint32(s.taskPid),
429+
Pid: taskPid,
433430
Timestamp: response.ExitedAt,
434431
Status: response.ExitStatus,
435432
}, nil
@@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
468465
return response.Stats, nil
469466
}
470467

471-
func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) {
468+
func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
472469
p := &process{
473470
id: id,
474471
shim: s,

0 commit comments

Comments
 (0)