Skip to content

Commit 2763639

Browse files
committed
Try to preserve exit event order
Signed-off-by: Michael Crosby <[email protected]>
1 parent bee4c1a commit 2763639

1 file changed

Lines changed: 98 additions & 36 deletions

File tree

sys/reaper/reaper_unix.go

Lines changed: 98 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,41 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process")
3333

3434
const bufferSize = 2048
3535

36+
type subscriber struct {
37+
sync.Mutex
38+
c chan runc.Exit
39+
closed bool
40+
}
41+
42+
func (s *subscriber) close() {
43+
s.Lock()
44+
if s.closed {
45+
s.Unlock()
46+
return
47+
}
48+
close(s.c)
49+
s.closed = true
50+
s.Unlock()
51+
}
52+
53+
func (s *subscriber) do(fn func()) {
54+
s.Lock()
55+
fn()
56+
s.Unlock()
57+
}
58+
3659
// Reap should be called when the process receives an SIGCHLD. Reap will reap
3760
// all exited processes and close their wait channels
3861
func Reap() error {
39-
var (
40-
now = time.Now()
41-
current []chan runc.Exit
42-
)
62+
now := time.Now()
4363
exits, err := sys.Reap(false)
44-
45-
Default.Lock()
46-
for c := range Default.subscribers {
47-
current = append(current, c)
48-
}
49-
Default.Unlock()
50-
5164
for _, e := range exits {
52-
go notify(runc.Exit{
65+
done := Default.notify(runc.Exit{
5366
Timestamp: now,
5467
Pid: e.Pid,
5568
Status: e.Status,
56-
}, current)
69+
})
70+
71+
select {
72+
case <-done:
73+
case <-time.After(1 * time.Second):
74+
}
5775
}
5876
return err
5977
}
6078

6179
// Default is the default monitor initialized for the package
6280
var Default = &Monitor{
63-
subscribers: make(map[chan runc.Exit]struct{}),
81+
subscribers: make(map[chan runc.Exit]*subscriber),
6482
}
6583

6684
// Monitor monitors the underlying system for process status changes
6785
type Monitor struct {
6886
sync.Mutex
6987

70-
subscribers map[chan runc.Exit]struct{}
88+
subscribers map[chan runc.Exit]*subscriber
7189
}
7290

7391
// Start starts the command a registers the process with the reaper
@@ -101,40 +119,84 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
101119
func (m *Monitor) Subscribe() chan runc.Exit {
102120
c := make(chan runc.Exit, bufferSize)
103121
m.Lock()
104-
m.subscribers[c] = struct{}{}
122+
m.subscribers[c] = &subscriber{
123+
c: c,
124+
}
105125
m.Unlock()
106126
return c
107127
}
108128

109129
// Unsubscribe to process exit changes
110130
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
111131
m.Lock()
132+
s, ok := m.subscribers[c]
133+
if !ok {
134+
m.Unlock()
135+
return
136+
}
137+
s.close()
112138
delete(m.subscribers, c)
113-
close(c)
114139
m.Unlock()
115140
}
116141

117-
func notify(e runc.Exit, subscribers []chan runc.Exit) {
118-
const timeout = 10 * time.Millisecond
119-
timer := time.NewTimer(timeout)
120-
timer.Stop()
121-
122-
for i := 0; i < 50; i++ {
123-
var failed []chan runc.Exit
124-
for _, s := range subscribers {
125-
timer.Reset(timeout)
142+
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
143+
out := make(map[chan runc.Exit]*subscriber)
144+
m.Lock()
145+
for k, v := range m.subscribers {
146+
out[k] = v
147+
}
148+
m.Unlock()
149+
return out
150+
}
126151

127-
select {
128-
case s <- e:
129-
case <-timer.C:
130-
failed = append(failed, s)
152+
func (m *Monitor) notify(e runc.Exit) chan struct{} {
153+
const timeout = 1 * time.Millisecond
154+
var (
155+
done = make(chan struct{}, 1)
156+
timer = time.NewTimer(timeout)
157+
success = make(map[chan runc.Exit]struct{})
158+
)
159+
stop(timer, true)
160+
161+
go func() {
162+
defer close(done)
163+
164+
for {
165+
var (
166+
failed int
167+
subscribers = m.getSubscribers()
168+
)
169+
for _, s := range subscribers {
170+
s.do(func() {
171+
if s.closed {
172+
return
173+
}
174+
if _, ok := success[s.c]; ok {
175+
return
176+
}
177+
timer.Reset(timeout)
178+
recv := true
179+
select {
180+
case s.c <- e:
181+
success[s.c] = struct{}{}
182+
case <-timer.C:
183+
recv = false
184+
failed++
185+
}
186+
stop(timer, recv)
187+
})
188+
}
189+
// all subscribers received the message
190+
if failed == 0 {
191+
return
131192
}
132-
timer.Stop()
133-
}
134-
// all subscribers received the message
135-
if len(failed) == 0 {
136-
return
137193
}
138-
subscribers = failed
194+
}()
195+
return done
196+
}
197+
198+
func stop(timer *time.Timer, recv bool) {
199+
if !timer.Stop() && recv {
200+
<-timer.C
139201
}
140202
}

0 commit comments

Comments
 (0)