@@ -7,6 +7,7 @@ package kubernetes
77
88import (
99 "context"
10+ "errors"
1011 "time"
1112
1213 "github.com/go-logr/logr"
@@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object {
5657type UpdateHandler struct {
5758 log logr.Logger
5859 client client.Client
60+ sendUpdates chan struct {}
5961 updateChannel chan Update
62+ writer * UpdateWriter
6063}
6164
6265func NewUpdateHandler (log logr.Logger , client client.Client ) * UpdateHandler {
66+ sendUpdates := make (chan struct {})
67+ updateChannel := make (chan Update , 100 )
6368 return & UpdateHandler {
6469 log : log ,
6570 client : client ,
66- updateChannel : make (chan Update , 100 ),
71+ sendUpdates : sendUpdates ,
72+ updateChannel : updateChannel ,
73+ writer : & UpdateWriter {
74+ log : log ,
75+ enabled : sendUpdates ,
76+ updateChannel : updateChannel ,
77+ eventsBeforeEnabled : make (chan Update , 1000 ),
78+ },
6779 }
6880}
6981
@@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
127139 u .log .Info ("started status update handler" )
128140 defer u .log .Info ("stopped status update handler" )
129141
142+ // Enable Updaters to start sending updates to this handler.
143+ close (u .sendUpdates )
144+ u .writer .handleEventsReceivedBeforeEnabled ()
145+
130146 for {
131147 select {
132148 case <- ctx .Done ():
@@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
142158
143159// Writer retrieves the interface that should be used to write to the UpdateHandler.
144160func (u * UpdateHandler ) Writer () Updater {
145- return & UpdateWriter {
146- updateChannel : u .updateChannel ,
147- }
161+ return u .writer
148162}
149163
150164// Updater describes an interface to send status updates somewhere.
@@ -154,13 +168,40 @@ type Updater interface {
154168
155169// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
156170type UpdateWriter struct {
171+ log logr.Logger
172+ enabled <- chan struct {}
157173 updateChannel chan <- Update
174+ // a temporary buffer to store events received before the Updater is enabled.
175+ // These events will be sent to the update channel once the Updater is enabled.
176+ eventsBeforeEnabled chan Update
158177}
159178
160179// Send sends the given Update off to the update channel for writing by the UpdateHandler.
161180func (u * UpdateWriter ) Send (update Update ) {
162181 // Non-blocking receive to see if we should pass along update.
163- u .updateChannel <- update
182+ select {
183+ case <- u .enabled :
184+ u .updateChannel <- update
185+ default :
186+ if len (u .eventsBeforeEnabled ) < cap (u .eventsBeforeEnabled ) {
187+ u .log .Info ("received a status update while disabled, storing for later" , "event" , update .NamespacedName )
188+ u .eventsBeforeEnabled <- update
189+ } else {
190+ // If the buffer is full, drop the event to avoid blocking the sender.
191+ u .log .Error (errors .New ("dropping status update, buffer full" ), "event" , update .NamespacedName )
192+ }
193+ }
194+ }
195+
196+ // handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel.
197+ func (u * UpdateWriter ) handleEventsReceivedBeforeEnabled () {
198+ go func () {
199+ for e := range u .eventsBeforeEnabled {
200+ u .log .Info ("sending stored status update" , "event" , e .NamespacedName )
201+ u .updateChannel <- e
202+ }
203+ close (u .eventsBeforeEnabled )
204+ }()
164205}
165206
166207// isStatusEqual checks if two objects have equivalent status.
0 commit comments