Skip to content

Commit bee4c1a

Browse files
committed
Add retry and non-blocking send for exit events
Signed-off-by: Michael Crosby <[email protected]>
1 parent 0d27d8f commit bee4c1a

2 files changed

Lines changed: 41 additions & 10 deletions

File tree

cmd/containerd-shim/shim_darwin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"os"
2323
"os/signal"
2424

25-
"github.com/containerd/containerd/runtime/v1/shim"
25+
"github.com/containerd/containerd/sys/reaper"
2626
runc "github.com/containerd/go-runc"
2727
"github.com/containerd/ttrpc"
2828
)
@@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) {
3434
signal.Notify(signals)
3535
// make sure runc is setup to use the monitor
3636
// for waiting on processes
37-
runc.Monitor = shim.Default
37+
runc.Monitor = reaper.Default
3838
return signals, nil
3939
}
4040

sys/reaper/reaper_unix.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,25 @@ const bufferSize = 2048
3636
// Reap should be called when the process receives an SIGCHLD. Reap will reap
3737
// all exited processes and close their wait channels
3838
func Reap() error {
39-
now := time.Now()
39+
var (
40+
now = time.Now()
41+
current []chan runc.Exit
42+
)
4043
exits, err := sys.Reap(false)
44+
4145
Default.Lock()
4246
for c := range Default.subscribers {
43-
for _, e := range exits {
44-
c <- runc.Exit{
45-
Timestamp: now,
46-
Pid: e.Pid,
47-
Status: e.Status,
48-
}
49-
}
47+
current = append(current, c)
5048
}
5149
Default.Unlock()
50+
51+
for _, e := range exits {
52+
go notify(runc.Exit{
53+
Timestamp: now,
54+
Pid: e.Pid,
55+
Status: e.Status,
56+
}, current)
57+
}
5258
return err
5359
}
5460

@@ -107,3 +113,28 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) {
107113
close(c)
108114
m.Unlock()
109115
}
116+
117+
func notify(e runc.Exit, subscribers []chan runc.Exit) {
118+
const timeout = 10 * time.Millisecond
119+
timer := time.NewTimer(timeout)
120+
timer.Stop()
121+
122+
for i := 0; i < 50; i++ {
123+
var failed []chan runc.Exit
124+
for _, s := range subscribers {
125+
timer.Reset(timeout)
126+
127+
select {
128+
case s <- e:
129+
case <-timer.C:
130+
failed = append(failed, s)
131+
}
132+
timer.Stop()
133+
}
134+
// all subscribers received the message
135+
if len(failed) == 0 {
136+
return
137+
}
138+
subscribers = failed
139+
}
140+
}

0 commit comments

Comments
 (0)