Skip to content

Commit f63d289

Browse files
authored
Merge pull request #2939 from jterry75/bug_publishstart
Implement the Runtime v2 Shim async task model for runhcs
2 parents acdb225 + 6468619 commit f63d289

6 files changed

Lines changed: 98 additions & 42 deletions

File tree

runtime/v2/README.md

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,27 @@ Filesystems are provided by the containerd snapshotters.
149149

150150
### Events
151151

152-
The shim MUST publish a `runtime.TaskExitEventTopic` when the container exits.
153-
If the shim collects Out of Memory events, it SHOULD also publish a `runtime.TaskOOMEventTopic`.
152+
The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement the following events where `Compliance=MUST`. This avoids race conditions between the shim and shim client where for example a call to `Start` can signal a `TaskExitEventTopic` before even returning the results from the `Start` call. With these guarantees of a Runtime v2 shim a call to `Start` is required to have published the async event `TaskStartEventTopic` before the shim can publish the `TaskExitEventTopic`.
153+
154+
#### Tasks
155+
156+
| Topic | Compliance | Description |
157+
| ----- | ---------- | ----------- |
158+
| `runtime.TaskCreateEventTopic` | MUST | When a task is successfully created |
159+
| `runtime.TaskStartEventTopic` | MUST (follow `TaskCreateEventTopic`) | When a task is successfully started |
160+
| `runtime.TaskExitEventTopic` | MUST (follow `TaskStartEventTopic`) | When a task exits expected or unexpected |
161+
| `runtime.TaskDeleteEventTopic` | MUST (follow `TaskExitEventTopic` or `TaskCreateEventTopic` if never started) | When a task is removed from a shim |
162+
| `runtime.TaskPausedEventTopic` | SHOULD | When a task is successfully paused |
163+
| `runtime.TaskResumedEventTopic` | SHOULD (follow `TaskPausedEventTopic`) | When a task is successfully resumed |
164+
| `runtime.TaskCheckpointedEventTopic` | SHOULD | When a task is checkpointed |
165+
| `runtime.TaskOOMEventTopic` | SHOULD | If the shim collects Out of Memory events |
166+
167+
#### Execs
168+
169+
| Topic | Compliance | Description |
170+
| ----- | ---------- | ----------- |
171+
| `runtime.TaskExecAddedEventTopic` | MUST (follow `TaskCreateEventTopic` ) | When an exec is successfully added |
172+
| `runtime.TaskExecStartedEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec is successfully started |
154173

155174
### Other
156175

runtime/v2/binary.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
gruntime "runtime"
2525
"strings"
2626

27-
eventstypes "github.com/containerd/containerd/api/events"
2827
"github.com/containerd/containerd/events/exchange"
2928
"github.com/containerd/containerd/log"
3029
"github.com/containerd/containerd/runtime"
@@ -152,13 +151,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
152151
// remove self from the runtime task list
153152
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
154153
b.rtTasks.Delete(ctx, b.bundle.ID)
155-
// shim will send the exit event
156-
b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
157-
ContainerID: b.bundle.ID,
158-
ExitStatus: response.ExitStatus,
159-
ExitedAt: response.ExitedAt,
160-
Pid: response.Pid,
161-
})
162154
return &runtime.Exit{
163155
Status: response.ExitStatus,
164156
Timestamp: response.ExitedAt,

runtime/v2/process.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package v2
1919
import (
2020
"context"
2121

22-
eventstypes "github.com/containerd/containerd/api/events"
2322
tasktypes "github.com/containerd/containerd/api/types/task"
2423
"github.com/containerd/containerd/errdefs"
2524
"github.com/containerd/containerd/runtime"
@@ -114,18 +113,13 @@ func (p *process) CloseIO(ctx context.Context) error {
114113

115114
// Start the process
116115
func (p *process) Start(ctx context.Context) error {
117-
response, err := p.shim.task.Start(ctx, &task.StartRequest{
116+
_, err := p.shim.task.Start(ctx, &task.StartRequest{
118117
ID: p.shim.ID(),
119118
ExecID: p.id,
120119
})
121120
if err != nil {
122121
return errdefs.FromGRPC(err)
123122
}
124-
p.shim.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{
125-
ContainerID: p.shim.ID(),
126-
Pid: response.Pid,
127-
ExecID: p.id,
128-
})
129123
return nil
130124
}
131125

runtime/v2/runhcs/process.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ func newProcess(ctx context.Context, s *service, id string, pid uint32, pr *pipe
5959

6060
func waitForProcess(ctx context.Context, process *process, p *os.Process, s *service) {
6161
pid := uint32(p.Pid)
62+
process.startedWg.Add(1)
63+
6264
// Store the default non-exited value for calls to stat
6365
process.exit.Store(&processExit{
6466
pid: pid,
@@ -87,9 +89,11 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
8789
// Wait for the relay
8890
process.relay.wait()
8991

90-
// close the client io, and free upstream waiters
91-
process.close()
92+
// Wait for the started event to fire if it hasn't already
93+
process.startedWg.Wait()
9294

95+
// We publish the exit before freeing upstream so that the exit event always
96+
// happens before any delete event.
9397
s.publisher.Publish(
9498
ctx,
9599
runtime.TaskExitEventTopic,
@@ -100,6 +104,9 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
100104
ExitStatus: uint32(status),
101105
ExitedAt: now,
102106
})
107+
108+
// close the client io, and free upstream waiters
109+
process.close()
103110
}
104111

105112
func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) {
@@ -114,6 +121,8 @@ func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRel
114121
relay: pr,
115122
waitBlock: make(chan struct{}),
116123
}
124+
process.startedWg.Add(1)
125+
117126
// Store the default non-exited value for calls to stat
118127
process.exit.Store(&processExit{
119128
pid: 0, // This is updated when the call to Start happens and the state is overwritten in waitForProcess.
@@ -139,7 +148,8 @@ type process struct {
139148

140149
// started track if the process has ever been started and will not be reset
141150
// for the lifetime of the process object.
142-
started bool
151+
started bool
152+
startedWg sync.WaitGroup
143153

144154
waitBlock chan struct{}
145155
// exit holds the exit value for all calls to `stat`. By default a

runtime/v2/runhcs/service.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@ import (
3737

3838
winio "github.com/Microsoft/go-winio"
3939
"github.com/Microsoft/hcsshim/pkg/go-runhcs"
40+
eventstypes "github.com/containerd/containerd/api/events"
4041
containerd_types "github.com/containerd/containerd/api/types"
4142
"github.com/containerd/containerd/api/types/task"
4243
"github.com/containerd/containerd/errdefs"
4344
"github.com/containerd/containerd/events"
4445
"github.com/containerd/containerd/log"
4546
"github.com/containerd/containerd/mount"
4647
"github.com/containerd/containerd/namespaces"
48+
"github.com/containerd/containerd/runtime"
4749
"github.com/containerd/containerd/runtime/v2/runhcs/options"
4850
"github.com/containerd/containerd/runtime/v2/shim"
4951
taskAPI "github.com/containerd/containerd/runtime/v2/task"
@@ -593,6 +595,22 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
593595
}
594596
s.processes[r.ID] = process
595597

598+
s.publisher.Publish(ctx,
599+
runtime.TaskCreateEventTopic,
600+
&eventstypes.TaskCreate{
601+
ContainerID: process.id,
602+
Bundle: process.bundle,
603+
Rootfs: r.Rootfs,
604+
IO: &eventstypes.TaskIO{
605+
Stdin: r.Stdin,
606+
Stdout: r.Stdout,
607+
Stderr: r.Stderr,
608+
Terminal: r.Terminal,
609+
},
610+
Checkpoint: "",
611+
Pid: uint32(pid),
612+
})
613+
596614
return &taskAPI.CreateTaskResponse{
597615
Pid: uint32(pid),
598616
}, nil
@@ -711,9 +729,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
711729
time.Sleep(1 * time.Second)
712730
}
713731
}
714-
stat := p.stat()
732+
733+
pid := p.stat().pid
734+
if r.ExecID != "" {
735+
s.publisher.Publish(ctx,
736+
runtime.TaskExecStartedEventTopic,
737+
&eventstypes.TaskExecStarted{
738+
ContainerID: p.cid,
739+
ExecID: p.id,
740+
Pid: pid,
741+
})
742+
} else {
743+
s.publisher.Publish(ctx,
744+
runtime.TaskStartEventTopic,
745+
&eventstypes.TaskStart{
746+
ContainerID: p.id,
747+
Pid: pid,
748+
})
749+
}
750+
751+
p.startedWg.Done()
752+
715753
return &taskAPI.StartResponse{
716-
Pid: stat.pid,
754+
Pid: pid,
717755
}, nil
718756
}
719757

@@ -750,6 +788,14 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
750788
s.mu.Lock()
751789
delete(s.processes, p.id)
752790
s.mu.Unlock()
791+
792+
s.publisher.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
793+
ContainerID: p.id,
794+
Pid: exit.pid,
795+
ExitStatus: exit.exitStatus,
796+
ExitedAt: exit.exitedAt,
797+
})
798+
753799
return &taskAPI.DeleteResponse{
754800
ExitedAt: exit.exitedAt,
755801
ExitStatus: exit.exitStatus,
@@ -780,6 +826,10 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
780826
return nil, err
781827
}
782828

829+
s.publisher.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
830+
r.ID,
831+
})
832+
783833
return empty, nil
784834
}
785835

@@ -799,6 +849,10 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
799849
return nil, err
800850
}
801851

852+
s.publisher.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
853+
r.ID,
854+
})
855+
802856
return empty, nil
803857
}
804858

@@ -910,6 +964,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
910964
}
911965
s.processes[r.ExecID] = process
912966

967+
s.publisher.Publish(ctx,
968+
runtime.TaskExecAddedEventTopic,
969+
&eventstypes.TaskExecAdded{
970+
ContainerID: process.cid,
971+
ExecID: process.id,
972+
})
973+
913974
return empty, nil
914975
}
915976

runtime/v2/shim.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"path/filepath"
2525
"time"
2626

27-
eventstypes "github.com/containerd/containerd/api/events"
2827
"github.com/containerd/containerd/api/types"
2928
tasktypes "github.com/containerd/containerd/api/types/task"
3029
"github.com/containerd/containerd/errdefs"
@@ -163,12 +162,6 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
163162
// remove self from the runtime task list
164163
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
165164
s.rtTasks.Delete(ctx, s.ID())
166-
s.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
167-
ContainerID: s.ID(),
168-
ExitStatus: response.ExitStatus,
169-
ExitedAt: response.ExitedAt,
170-
Pid: response.Pid,
171-
})
172165
return &runtime.Exit{
173166
Status: response.ExitStatus,
174167
Timestamp: response.ExitedAt,
@@ -212,9 +205,6 @@ func (s *shim) Pause(ctx context.Context) error {
212205
}); err != nil {
213206
return errdefs.FromGRPC(err)
214207
}
215-
s.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
216-
ContainerID: s.ID(),
217-
})
218208
return nil
219209
}
220210

@@ -224,9 +214,6 @@ func (s *shim) Resume(ctx context.Context) error {
224214
}); err != nil {
225215
return errdefs.FromGRPC(err)
226216
}
227-
s.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
228-
ContainerID: s.ID(),
229-
})
230217
return nil
231218
}
232219

@@ -238,10 +225,6 @@ func (s *shim) Start(ctx context.Context) error {
238225
return errdefs.FromGRPC(err)
239226
}
240227
s.taskPid = int(response.Pid)
241-
s.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
242-
ContainerID: s.ID(),
243-
Pid: uint32(s.taskPid),
244-
})
245228
return nil
246229
}
247230

@@ -340,9 +323,6 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
340323
if _, err := s.task.Checkpoint(ctx, request); err != nil {
341324
return errdefs.FromGRPC(err)
342325
}
343-
s.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
344-
ContainerID: s.ID(),
345-
})
346326
return nil
347327
}
348328

0 commit comments

Comments
 (0)