@@ -24,7 +24,7 @@ import (
2424 "time"
2525
2626 "github.com/containerd/containerd/sys"
27- runc "github.com/containerd/go-runc"
27+ "github.com/containerd/go-runc"
2828 "github.com/pkg/errors"
2929)
3030
@@ -38,30 +38,26 @@ const bufferSize = 2048
3838func Reap () error {
3939 now := time .Now ()
4040 exits , err := sys .Reap (false )
41- Default .Lock ()
42- for c := range Default .subscribers {
43- for _ , e := range exits {
44- c <- runc.Exit {
45- Timestamp : now ,
46- Pid : e .Pid ,
47- Status : e .Status ,
48- }
49- }
41+ for _ , e := range exits {
42+ Default .Notify (runc.Exit {
43+ Timestamp : now ,
44+ Pid : e .Pid ,
45+ Status : e .Status ,
46+ })
5047 }
51- Default .Unlock ()
5248 return err
5349}
5450
5551// Default is the default monitor initialized for the package
5652var Default = & Monitor {
57- subscribers : make (map [chan runc.Exit ]struct {} ),
53+ subscribers : make (map [chan runc.Exit ]bool ),
5854}
5955
6056// Monitor monitors the underlying system for process status changes
6157type Monitor struct {
6258 sync.Mutex
6359
64- subscribers map [chan runc.Exit ]struct {}
60+ subscribers map [chan runc.Exit ]bool
6561}
6662
6763// Start starts the command a registers the process with the reaper
@@ -95,7 +91,7 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
9591func (m * Monitor ) Subscribe () chan runc.Exit {
9692 c := make (chan runc.Exit , bufferSize )
9793 m .Lock ()
98- m .subscribers [c ] = struct {}{}
94+ m .subscribers [c ] = false
9995 m .Unlock ()
10096 return c
10197}
@@ -107,3 +103,29 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) {
107103 close (c )
108104 m .Unlock ()
109105}
106+
107+ // Notify to subscribers exit changes
108+ func (m * Monitor ) Notify (e runc.Exit ) {
109+ retry:
110+ m .Lock ()
111+ for c , v := range m .subscribers {
112+ // subscriber has receive this exit
113+ if v == true {
114+ continue
115+ }
116+
117+ select {
118+ case c <- e :
119+ m .subscribers [c ] = true
120+ case <- time .After (time .Millisecond ):
121+ m .Unlock ()
122+ goto retry
123+ }
124+ }
125+
126+ // reset subscriber's state
127+ for c := range m .subscribers {
128+ m .subscribers [c ] = false
129+ }
130+ m .Unlock ()
131+ }
0 commit comments