Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ type PlatformRuntime interface {
// Add adds a task into runtime.
Add(ctx context.Context, task Task) error
// Delete remove a task.
Delete(ctx context.Context, taskID string)
Delete(ctx context.Context, taskID string) (*Exit, error)
}
14 changes: 11 additions & 3 deletions runtime/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type Process interface {
Start(ctx context.Context) error
// Wait for the process to exit
Wait(ctx context.Context) (*Exit, error)
}

// ExecProcess is a process spawned in container via Task.Exec call.
// The only difference from a regular `Process` is that exec process can delete self,
// while task process requires slightly more complex logic and needs to be deleted through the task manager.
type ExecProcess interface {
Process

// Delete deletes the process
Delete(ctx context.Context) (*Exit, error)
}
Expand All @@ -56,23 +64,23 @@ type Task interface {
Process

// PID of the process
PID() uint32
PID(ctx context.Context) (uint32, error)
// Namespace that the task exists in
Namespace() string
// Pause pauses the container process
Pause(ctx context.Context) error
// Resume unpauses the container process
Resume(ctx context.Context) error
// Exec adds a process into the container
Exec(ctx context.Context, id string, opts ExecOpts) (Process, error)
Exec(ctx context.Context, id string, opts ExecOpts) (ExecProcess, error)
// Pids returns all pids
Pids(ctx context.Context) ([]ProcessInfo, error)
// Checkpoint checkpoints a container to an image with live system data
Checkpoint(ctx context.Context, path string, opts *types.Any) error
// Update sets the provided resources to a running task
Update(ctx context.Context, resources *types.Any, annotations map[string]string) error
// Process returns a process within the task for the provided id
Process(ctx context.Context, id string) (Process, error)
Process(ctx context.Context, id string) (ExecProcess, error)
// Stats returns runtime specific metrics for a task
Stats(ctx context.Context) (*types.Any, error)
}
Expand Down
18 changes: 15 additions & 3 deletions runtime/v1/linux/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
v1 "github.com/containerd/containerd/runtime/v1"
shim "github.com/containerd/containerd/runtime/v1/shim/v1"
runc "github.com/containerd/go-runc"
"github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -315,8 +315,20 @@ func (r *Runtime) Add(ctx context.Context, task runtime.Task) error {
}

// Delete a runtime task
func (r *Runtime) Delete(ctx context.Context, id string) {
func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := r.tasks.Get(ctx, id)
if err != nil {
return nil, err
}

s := task.(*Task)
exit, err := s.Delete(ctx)
if err != nil {
return nil, err
}

r.tasks.Delete(ctx, id)
return exit, nil
}

func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
Expand Down
8 changes: 4 additions & 4 deletions runtime/v1/linux/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (t *Task) Namespace() string {
}

// PID of the task
func (t *Task) PID() uint32 {
return uint32(t.pid)
func (t *Task) PID(_ctx context.Context) (uint32, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? _ctx

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, context is not used in v1 runtime, but needs to be passed in order to implement the interface. So it's marked with _ prefix to be ignored by linters.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right; I usually just use _ (no variable name) for that

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look like rust style. go use _

return uint32(t.pid), nil
}

// Delete the task and return the exit status
Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
}

// Exec creates a new process inside the task
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func (t *Task) Update(ctx context.Context, resources *types.Any, _ map[string]st
}

// Process returns a specific process inside the task by the process id
func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
func (t *Task) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &Process{
id: id,
t: t,
Expand Down
15 changes: 4 additions & 11 deletions runtime/v2/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
gruntime "runtime"
"strings"

"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
Expand All @@ -36,14 +35,12 @@ import (
"github.com/sirupsen/logrus"
)

func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
func shimBinary(bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string) *binary {
return &binary{
bundle: bundle,
runtime: runtime,
containerdAddress: containerdAddress,
containerdTTRPCAddress: containerdTTRPCAddress,
events: events,
rtTasks: rt,
}
}

Expand All @@ -52,8 +49,6 @@ type binary struct {
containerdAddress string
containerdTTRPCAddress string
bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
}

func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
Expand Down Expand Up @@ -123,11 +118,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
}
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
return &shim{
bundle: b.bundle,
client: client,
task: task.NewTaskClient(client),
events: b.events,
rtTasks: b.rtTasks,
bundle: b.bundle,
client: client,
task: task.NewTaskClient(client),
}, nil
}

Expand Down
23 changes: 17 additions & 6 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
topts = opts.RuntimeOptions
}

b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
b := shimBinary(bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress)
shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")

Expand All @@ -185,7 +185,7 @@ func (m *TaskManager) deleteShim(shim *shim) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()

_, errShim := shim.Delete(dctx)
_, errShim := shim.delete(dctx, m.tasks.Delete)
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
Expand All @@ -207,8 +207,19 @@ func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
}

// Delete a runtime task
func (m *TaskManager) Delete(ctx context.Context, id string) {
m.tasks.Delete(ctx, id)
func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := m.tasks.Get(ctx, id)
if err != nil {
return nil, err
}

shim := task.(*shim)
exit, err := shim.delete(ctx, m.tasks.Delete)
if err != nil {
return nil, err
}

return exit, err
}

// Tasks lists all tasks
Expand Down Expand Up @@ -287,8 +298,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
bundle.Delete()
continue
}
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
binaryCall := shimBinary(bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress)
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")

cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
Expand Down
69 changes: 33 additions & 36 deletions runtime/v2/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) {
return string(data), nil
}

func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) {
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
}
}()
s := &shim{
client: client,
task: task.NewTaskClient(client),
bundle: bundle,
events: events,
rtTasks: rt,
client: client,
task: task.NewTaskClient(client),
bundle: bundle,
}
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel()
if err := s.Connect(ctx); err != nil {

// Check connectivity
if _, err := s.PID(ctx); err != nil {
return nil, err
}
return s, nil
Expand Down Expand Up @@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
var _ runtime.Task = &shim{}

type shim struct {
bundle *Bundle
client *ttrpc.Client
task task.TaskService
taskPid int
events *exchange.Exchange
rtTasks *runtime.TaskList
}

func (s *shim) Connect(ctx context.Context) error {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return err
}
s.taskPid = int(response.TaskPid)
return nil
bundle *Bundle
client *ttrpc.Client
task task.TaskService
}

func (s *shim) Shutdown(ctx context.Context) error {
Expand All @@ -227,8 +213,15 @@ func (s *shim) ID() string {
}

// PID of the task
func (s *shim) PID() uint32 {
return uint32(s.taskPid)
func (s *shim) PID(ctx context.Context) (uint32, error) {
Comment thread
mxpv marked this conversation as resolved.
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}

return response.TaskPid, nil
}

func (s *shim) Namespace() string {
Expand All @@ -239,7 +232,7 @@ func (s *shim) Close() error {
return s.client.Close()
}

func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(),
})
Expand All @@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
// So we should remove the record and prevent duplicate events from
// ttrpc-callback-on-close.
if shimErr == nil {
s.rtTasks.Delete(ctx, s.ID())
removeTask(ctx, s.ID())
}

if err := s.waitShutdown(ctx); err != nil {
Expand All @@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {

// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
s.rtTasks.Delete(ctx, s.ID())
removeTask(ctx, s.ID())
if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
}
Expand Down Expand Up @@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
Options: m.Options,
})
}
response, err := s.task.Create(ctx, request)

_, err := s.task.Create(ctx, request)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
s.taskPid = int(response.Pid)

return s, nil
}

Expand All @@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error {
}

func (s *shim) Start(ctx context.Context) error {
response, err := s.task.Start(ctx, &task.StartRequest{
_, err := s.task.Start(ctx, &task.StartRequest{
ID: s.ID(),
})
if err != nil {
return errdefs.FromGRPC(err)
}
s.taskPid = int(response.Pid)
return nil
}

Expand All @@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
return nil
}

func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id %s", id)
}
Expand Down Expand Up @@ -422,14 +415,18 @@ func (s *shim) CloseIO(ctx context.Context) error {
}

func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
taskPid, err := s.PID(ctx)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just record the taskPid? s.PID always send request to shim to get taskPID here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pid is task service specific thing. The goal of this PR is to make shim stateless and unaware about services shim instance provides. This way we can add new services on top of shims independently - task service, sandbox api, port forwarding, pulling inside sandbox, etc (lots of context described in #5742). Pid will move to task service back, once we decouple shim v2 task manager into shim manager + task service.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. thanks for the comment~

if err != nil {
return nil, err
}
response, err := s.task.Wait(ctx, &task.WaitRequest{
ID: s.ID(),
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return &runtime.Exit{
Pid: uint32(s.taskPid),
Pid: taskPid,
Timestamp: response.ExitedAt,
Status: response.ExitStatus,
}, nil
Expand Down Expand Up @@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
return response.Stats, nil
}

func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) {
func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &process{
id: id,
shim: s,
Expand Down
Loading