Skip to content

Commit b62fa1d

Browse files
Add client side event rate limiting
1 parent ab40f52 commit b62fa1d

File tree

4 files changed

+131
-15
lines changed

4 files changed

+131
-15
lines changed

staging/src/k8s.io/client-go/tools/record/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,6 @@ go_library(
5353
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
5454
"//vendor/k8s.io/client-go/rest:go_default_library",
5555
"//vendor/k8s.io/client-go/tools/reference:go_default_library",
56+
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
5657
],
5758
)

staging/src/k8s.io/client-go/tools/record/event_test.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,8 @@ func TestWriteEventError(t *testing.T) {
412412
},
413413
}
414414

415-
eventCorrelator := NewEventCorrelator(clock.RealClock{})
415+
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
416+
eventCorrelator := NewEventCorrelator(&clock)
416417
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
417418

418419
for caseName, ent := range table {
@@ -435,7 +436,8 @@ func TestWriteEventError(t *testing.T) {
435436
}
436437

437438
func TestUpdateExpiredEvent(t *testing.T) {
438-
eventCorrelator := NewEventCorrelator(clock.RealClock{})
439+
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
440+
eventCorrelator := NewEventCorrelator(&clock)
439441
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
440442

441443
var createdEvent *v1.Event
@@ -497,14 +499,15 @@ func TestLotsOfEvents(t *testing.T) {
497499
loggerCalled <- struct{}{}
498500
})
499501
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "eventTest"})
500-
ref := &v1.ObjectReference{
501-
Kind: "Pod",
502-
Name: "foo",
503-
Namespace: "baz",
504-
UID: "bar",
505-
APIVersion: "version",
506-
}
507502
for i := 0; i < maxQueuedEvents; i++ {
503+
// we want a unique object to stop spam filtering
504+
ref := &v1.ObjectReference{
505+
Kind: "Pod",
506+
Name: fmt.Sprintf("foo-%v", i),
507+
Namespace: "baz",
508+
UID: "bar",
509+
APIVersion: "version",
510+
}
508511
// we need to vary the reason to prevent aggregation
509512
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
510513
}

staging/src/k8s.io/client-go/tools/record/events_cache.go

+94-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/util/clock"
3131
"k8s.io/apimachinery/pkg/util/sets"
3232
"k8s.io/apimachinery/pkg/util/strategicpatch"
33+
"k8s.io/client-go/util/flowcontrol"
3334
)
3435

3536
const (
@@ -39,6 +40,13 @@ const (
3940
// more than 10 times in a 10 minute period, aggregate the event
4041
defaultAggregateMaxEvents = 10
4142
defaultAggregateIntervalInSeconds = 600
43+
44+
// by default, allow a source to send 25 events about an object
45+
// but control the refill rate to 1 new event every 5 minutes
46+
// this helps control the long-tail of events for things that are always
47+
// unhealthy
48+
defaultSpamBurst = 25
49+
defaultSpamQPS = 1. / 300.
4250
)
4351

4452
// getEventKey builds unique event key based on source, involvedObject, reason, message
@@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string {
5967
"")
6068
}
6169

70+
// getSpamKey builds unique event key based on source, involvedObject
71+
func getSpamKey(event *v1.Event) string {
72+
return strings.Join([]string{
73+
event.Source.Component,
74+
event.Source.Host,
75+
event.InvolvedObject.Kind,
76+
event.InvolvedObject.Namespace,
77+
event.InvolvedObject.Name,
78+
string(event.InvolvedObject.UID),
79+
event.InvolvedObject.APIVersion,
80+
},
81+
"")
82+
}
83+
6284
// EventFilterFunc is a function that returns true if the event should be skipped
6385
type EventFilterFunc func(event *v1.Event) bool
6486

@@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
6789
return false
6890
}
6991

92+
// EventSourceObjectSpamFilter is responsible for throttling
93+
// the amount of events a source and object can produce.
94+
type EventSourceObjectSpamFilter struct {
95+
sync.RWMutex
96+
97+
// the cache that manages last synced state
98+
cache *lru.Cache
99+
100+
// burst is the amount of events we allow per source + object
101+
burst int
102+
103+
// qps is the refill rate of the token bucket in queries per second
104+
qps float32
105+
106+
// clock is used to allow for testing over a time interval
107+
clock clock.Clock
108+
}
109+
110+
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
111+
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
112+
return &EventSourceObjectSpamFilter{
113+
cache: lru.New(lruCacheSize),
114+
burst: burst,
115+
qps: qps,
116+
clock: clock,
117+
}
118+
}
119+
120+
// spamRecord holds data used to perform spam filtering decisions.
121+
type spamRecord struct {
122+
// rateLimiter controls the rate of events about this object
123+
rateLimiter flowcontrol.RateLimiter
124+
}
125+
126+
// Filter controls that a given source+object are not exceeding the allowed rate.
127+
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
128+
var record spamRecord
129+
130+
// controls our cached information about this event (source+object)
131+
eventKey := getSpamKey(event)
132+
133+
// do we have a record of similar events in our cache?
134+
f.Lock()
135+
defer f.Unlock()
136+
value, found := f.cache.Get(eventKey)
137+
if found {
138+
record = value.(spamRecord)
139+
}
140+
141+
// verify we have a rate limiter for this record
142+
if record.rateLimiter == nil {
143+
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
144+
}
145+
146+
// ensure we have available rate
147+
filter := !record.rateLimiter.TryAccept()
148+
149+
// update the cache
150+
f.cache.Add(eventKey, record)
151+
152+
return filter
153+
}
154+
70155
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
71156
// It returns a tuple of the following:
72157
// aggregateKey - key the identifies the aggregate group to bucket this event
@@ -337,18 +422,20 @@ type EventCorrelateResult struct {
337422
// prior to interacting with the API server to record the event.
338423
//
339424
// The default behavior is as follows:
340-
// * No events are filtered from being recorded
341425
// * Aggregation is performed if a similar event is recorded 10 times in a
342426
// in a 10 minute rolling interval. A similar event is an event that varies only by
343427
// the Event.Message field. Rather than recording the precise event, aggregation
344428
// will create a new event whose message reports that it has combined events with
345429
// the same reason.
346430
// * Events are incrementally counted if the exact same event is encountered multiple
347431
// times.
432+
// * A source may burst 25 events about an object, but has a refill rate budget
433+
// per object of 1 event every 5 minutes to control long-tail of spam.
348434
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
349435
cacheSize := maxLruCacheEntries
436+
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
350437
return &EventCorrelator{
351-
filterFunc: DefaultEventFilterFunc,
438+
filterFunc: spamFilter.Filter,
352439
aggregator: NewEventAggregator(
353440
cacheSize,
354441
EventAggregatorByReasonFunc,
@@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
363450

364451
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
365452
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
366-
if c.filterFunc(newEvent) {
367-
return &EventCorrelateResult{Skip: true}, nil
453+
if newEvent == nil {
454+
return nil, fmt.Errorf("event is nil")
368455
}
369456
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
370457
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
458+
if c.filterFunc(observedEvent) {
459+
return &EventCorrelateResult{Skip: true}, nil
460+
}
371461
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
372462
}
373463

staging/src/k8s.io/client-go/tools/record/events_cache_test.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func TestEventCorrelator(t *testing.T) {
181181
newEvent v1.Event
182182
expectedEvent v1.Event
183183
intervalSeconds int
184+
expectedSkip bool
184185
}{
185186
"create-a-single-event": {
186187
previousEvents: []v1.Event{},
@@ -198,7 +199,13 @@ func TestEventCorrelator(t *testing.T) {
198199
previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent),
199200
newEvent: duplicateEvent,
200201
expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1),
201-
intervalSeconds: 5,
202+
intervalSeconds: 30, // larger interval induces aggregation but not spam.
203+
},
204+
"the-same-event-is-spam-if-happens-too-frequently": {
205+
previousEvents: makeEvents(defaultSpamBurst+1, duplicateEvent),
206+
newEvent: duplicateEvent,
207+
expectedSkip: true,
208+
intervalSeconds: 1,
202209
},
203210
"create-many-unique-events": {
204211
previousEvents: makeUniqueEvents(30),
@@ -245,7 +252,10 @@ func TestEventCorrelator(t *testing.T) {
245252
if err != nil {
246253
t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err)
247254
}
248-
correlator.UpdateState(result.Event)
255+
// if we are skipping the event, we can avoid updating state
256+
if !result.Skip {
257+
correlator.UpdateState(result.Event)
258+
}
249259
}
250260

251261
// update the input to current clock value
@@ -257,6 +267,18 @@ func TestEventCorrelator(t *testing.T) {
257267
t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err)
258268
}
259269

270+
// verify we did not get skip from filter function unexpectedly...
271+
if result.Skip != testInput.expectedSkip {
272+
t.Errorf("scenario %v: expected skip %v, but got %v", testScenario, testInput.expectedSkip, result.Skip)
273+
continue
274+
}
275+
276+
// we wanted to actually skip, so no event is needed to validate
277+
if testInput.expectedSkip {
278+
continue
279+
}
280+
281+
// validate event
260282
_, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t)
261283
if err != nil {
262284
t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)

0 commit comments

Comments
 (0)