Skip to content

Commit 8ec3095

Browse files
authored
Fix: frequent 503 errors when connecting to a Service experiencing high Pod churn (envoyproxy#4754)
* Revert "fix: some status updates are discarded by the status updater (envoyproxy#4337)" This reverts commit 14830c7. Signed-off-by: Huabing Zhao <[email protected]> * store update events and process it later Signed-off-by: Huabing Zhao <[email protected]> * rename method Signed-off-by: Huabing Zhao <[email protected]> * add release note Signed-off-by: Huabing Zhao <[email protected]> --------- Signed-off-by: Huabing Zhao <[email protected]>
1 parent cda2dcb commit 8ec3095

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

internal/provider/kubernetes/status_updater.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kubernetes
77

88
import (
99
"context"
10+
"errors"
1011
"time"
1112

1213
"github.com/go-logr/logr"
@@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object {
5657
type UpdateHandler struct {
5758
log logr.Logger
5859
client client.Client
60+
sendUpdates chan struct{}
5961
updateChannel chan Update
62+
writer *UpdateWriter
6063
}
6164

6265
func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
66+
sendUpdates := make(chan struct{})
67+
updateChannel := make(chan Update, 100)
6368
return &UpdateHandler{
6469
log: log,
6570
client: client,
66-
updateChannel: make(chan Update, 100),
71+
sendUpdates: sendUpdates,
72+
updateChannel: updateChannel,
73+
writer: &UpdateWriter{
74+
log: log,
75+
enabled: sendUpdates,
76+
updateChannel: updateChannel,
77+
eventsBeforeEnabled: make(chan Update, 1000),
78+
},
6779
}
6880
}
6981

@@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
127139
u.log.Info("started status update handler")
128140
defer u.log.Info("stopped status update handler")
129141

142+
// Enable Updaters to start sending updates to this handler.
143+
close(u.sendUpdates)
144+
u.writer.handleEventsReceivedBeforeEnabled()
145+
130146
for {
131147
select {
132148
case <-ctx.Done():
@@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
142158

143159
// Writer retrieves the interface that should be used to write to the UpdateHandler.
144160
func (u *UpdateHandler) Writer() Updater {
145-
return &UpdateWriter{
146-
updateChannel: u.updateChannel,
147-
}
161+
return u.writer
148162
}
149163

150164
// Updater describes an interface to send status updates somewhere.
@@ -154,13 +168,40 @@ type Updater interface {
154168

155169
// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
156170
type UpdateWriter struct {
171+
log logr.Logger
172+
enabled <-chan struct{}
157173
updateChannel chan<- Update
174+
// a temporary buffer to store events received before the Updater is enabled.
175+
// These events will be sent to the update channel once the Updater is enabled.
176+
eventsBeforeEnabled chan Update
158177
}
159178

160179
// Send sends the given Update off to the update channel for writing by the UpdateHandler.
161180
func (u *UpdateWriter) Send(update Update) {
162181
// Non-blocking receive to see if we should pass along update.
163-
u.updateChannel <- update
182+
select {
183+
case <-u.enabled:
184+
u.updateChannel <- update
185+
default:
186+
if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) {
187+
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
188+
u.eventsBeforeEnabled <- update
189+
} else {
190+
// If the buffer is full, drop the event to avoid blocking the sender.
191+
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
192+
}
193+
}
194+
}
195+
196+
// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel.
197+
func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() {
198+
go func() {
199+
for e := range u.eventsBeforeEnabled {
200+
u.log.Info("sending stored status update", "event", e.NamespacedName)
201+
u.updateChannel <- e
202+
}
203+
close(u.eventsBeforeEnabled)
204+
}()
164205
}
165206

166207
// isStatusEqual checks if two objects have equivalent status.

release-notes/current.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ bug fixes: |
2121
Fixed failed to update SecurityPolicy resources with the `backendRef` field specified
2222
Fixed Envoy rejecting TCP Listeners that have no attached TCPRoutes
2323
Fixed xDS translation failed when oidc tokenEndpoint and jwt remoteJWKS are specified in the same SecurityPolicy and using the same hostname
24+
Fixed frequent 503 errors when connecting to a Service experiencing high Pod churn
2425
2526
# Enhancements that improve performance.
2627
performance improvements: |

0 commit comments

Comments
 (0)