@@ -33,41 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process")
3333
3434const bufferSize = 2048
3535
36+ type subscriber struct {
37+ sync.Mutex
38+ c chan runc.Exit
39+ closed bool
40+ }
41+
42+ func (s * subscriber ) close () {
43+ s .Lock ()
44+ if s .closed {
45+ s .Unlock ()
46+ return
47+ }
48+ close (s .c )
49+ s .closed = true
50+ s .Unlock ()
51+ }
52+
53+ func (s * subscriber ) do (fn func ()) {
54+ s .Lock ()
55+ fn ()
56+ s .Unlock ()
57+ }
58+
3659// Reap should be called when the process receives an SIGCHLD. Reap will reap
3760// all exited processes and close their wait channels
3861func Reap () error {
39- var (
40- now = time .Now ()
41- current []chan runc.Exit
42- )
62+ now := time .Now ()
4363 exits , err := sys .Reap (false )
44-
45- Default .Lock ()
46- for c := range Default .subscribers {
47- current = append (current , c )
48- }
49- Default .Unlock ()
50-
5164 for _ , e := range exits {
52- go notify (runc.Exit {
65+ done := Default . notify (runc.Exit {
5366 Timestamp : now ,
5467 Pid : e .Pid ,
5568 Status : e .Status ,
56- }, current )
69+ })
70+
71+ select {
72+ case <- done :
73+ case <- time .After (1 * time .Second ):
74+ }
5775 }
5876 return err
5977}
6078
6179// Default is the default monitor initialized for the package
6280var Default = & Monitor {
63- subscribers : make (map [chan runc.Exit ]struct {} ),
81+ subscribers : make (map [chan runc.Exit ]* subscriber ),
6482}
6583
6684// Monitor monitors the underlying system for process status changes
6785type Monitor struct {
6886 sync.Mutex
6987
70- subscribers map [chan runc.Exit ]struct {}
88+ subscribers map [chan runc.Exit ]* subscriber
7189}
7290
7391// Start starts the command a registers the process with the reaper
@@ -101,40 +119,84 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
101119func (m * Monitor ) Subscribe () chan runc.Exit {
102120 c := make (chan runc.Exit , bufferSize )
103121 m .Lock ()
104- m .subscribers [c ] = struct {}{}
122+ m .subscribers [c ] = & subscriber {
123+ c : c ,
124+ }
105125 m .Unlock ()
106126 return c
107127}
108128
109129// Unsubscribe to process exit changes
110130func (m * Monitor ) Unsubscribe (c chan runc.Exit ) {
111131 m .Lock ()
132+ s , ok := m .subscribers [c ]
133+ if ! ok {
134+ m .Unlock ()
135+ return
136+ }
137+ s .close ()
112138 delete (m .subscribers , c )
113- close (c )
114139 m .Unlock ()
115140}
116141
117- func notify ( e runc. Exit , subscribers [] chan runc.Exit ) {
118- const timeout = 10 * time . Millisecond
119- timer := time . NewTimer ( timeout )
120- timer . Stop ()
121-
122- for i := 0 ; i < 50 ; i ++ {
123- var failed [] chan runc. Exit
124- for _ , s := range subscribers {
125- timer . Reset ( timeout )
142+ func ( m * Monitor ) getSubscribers () map [ chan runc.Exit ] * subscriber {
143+ out := make ( map [ chan runc. Exit ] * subscriber )
144+ m . Lock ( )
145+ for k , v := range m . subscribers {
146+ out [ k ] = v
147+ }
148+ m . Unlock ()
149+ return out
150+ }
126151
127- select {
128- case s <- e :
129- case <- timer .C :
130- failed = append (failed , s )
152+ func (m * Monitor ) notify (e runc.Exit ) chan struct {} {
153+ const timeout = 1 * time .Millisecond
154+ var (
155+ done = make (chan struct {}, 1 )
156+ timer = time .NewTimer (timeout )
157+ success = make (map [chan runc.Exit ]struct {})
158+ )
159+ stop (timer , true )
160+
161+ go func () {
162+ defer close (done )
163+
164+ for {
165+ var (
166+ failed int
167+ subscribers = m .getSubscribers ()
168+ )
169+ for _ , s := range subscribers {
170+ s .do (func () {
171+ if s .closed {
172+ return
173+ }
174+ if _ , ok := success [s .c ]; ok {
175+ return
176+ }
177+ timer .Reset (timeout )
178+ recv := true
179+ select {
180+ case s .c <- e :
181+ success [s .c ] = struct {}{}
182+ case <- timer .C :
183+ recv = false
184+ failed ++
185+ }
186+ stop (timer , recv )
187+ })
188+ }
189+ // all subscribers received the message
190+ if failed == 0 {
191+ return
131192 }
132- timer .Stop ()
133- }
134- // all subscribers received the message
135- if len (failed ) == 0 {
136- return
137193 }
138- subscribers = failed
194+ }()
195+ return done
196+ }
197+
198+ func stop (timer * time.Timer , recv bool ) {
199+ if ! timer .Stop () && recv {
200+ <- timer .C
139201 }
140202}
0 commit comments