@@ -30,6 +30,7 @@ import (
30
30
"k8s.io/apimachinery/pkg/util/clock"
31
31
"k8s.io/apimachinery/pkg/util/sets"
32
32
"k8s.io/apimachinery/pkg/util/strategicpatch"
33
+ "k8s.io/client-go/util/flowcontrol"
33
34
)
34
35
35
36
const (
@@ -39,6 +40,13 @@ const (
39
40
// more than 10 times in a 10 minute period, aggregate the event
40
41
defaultAggregateMaxEvents = 10
41
42
defaultAggregateIntervalInSeconds = 600
43
+
44
+ // by default, allow a source to send 25 events about an object
45
+ // but control the refill rate to 1 new event every 5 minutes
46
+ // this helps control the long-tail of events for things that are always
47
+ // unhealthy
48
+ defaultSpamBurst = 25
49
+ defaultSpamQPS = 1. / 300.
42
50
)
43
51
44
52
// getEventKey builds unique event key based on source, involvedObject, reason, message
@@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string {
59
67
"" )
60
68
}
61
69
70
+ // getSpamKey builds unique event key based on source, involvedObject
71
+ func getSpamKey (event * v1.Event ) string {
72
+ return strings .Join ([]string {
73
+ event .Source .Component ,
74
+ event .Source .Host ,
75
+ event .InvolvedObject .Kind ,
76
+ event .InvolvedObject .Namespace ,
77
+ event .InvolvedObject .Name ,
78
+ string (event .InvolvedObject .UID ),
79
+ event .InvolvedObject .APIVersion ,
80
+ },
81
+ "" )
82
+ }
83
+
62
84
// EventFilterFunc is a function that returns true if the event should be skipped
63
85
type EventFilterFunc func (event * v1.Event ) bool
64
86
@@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
67
89
return false
68
90
}
69
91
92
+ // EventSourceObjectSpamFilter is responsible for throttling
93
+ // the amount of events a source and object can produce.
94
+ type EventSourceObjectSpamFilter struct {
95
+ sync.RWMutex
96
+
97
+ // the cache that manages last synced state
98
+ cache * lru.Cache
99
+
100
+ // burst is the amount of events we allow per source + object
101
+ burst int
102
+
103
+ // qps is the refill rate of the token bucket in queries per second
104
+ qps float32
105
+
106
+ // clock is used to allow for testing over a time interval
107
+ clock clock.Clock
108
+ }
109
+
110
+ // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
111
+ func NewEventSourceObjectSpamFilter (lruCacheSize , burst int , qps float32 , clock clock.Clock ) * EventSourceObjectSpamFilter {
112
+ return & EventSourceObjectSpamFilter {
113
+ cache : lru .New (lruCacheSize ),
114
+ burst : burst ,
115
+ qps : qps ,
116
+ clock : clock ,
117
+ }
118
+ }
119
+
120
+ // spamRecord holds data used to perform spam filtering decisions.
121
+ type spamRecord struct {
122
+ // rateLimiter controls the rate of events about this object
123
+ rateLimiter flowcontrol.RateLimiter
124
+ }
125
+
126
+ // Filter controls that a given source+object are not exceeding the allowed rate.
127
+ func (f * EventSourceObjectSpamFilter ) Filter (event * v1.Event ) bool {
128
+ var record spamRecord
129
+
130
+ // controls our cached information about this event (source+object)
131
+ eventKey := getSpamKey (event )
132
+
133
+ // do we have a record of similar events in our cache?
134
+ f .Lock ()
135
+ defer f .Unlock ()
136
+ value , found := f .cache .Get (eventKey )
137
+ if found {
138
+ record = value .(spamRecord )
139
+ }
140
+
141
+ // verify we have a rate limiter for this record
142
+ if record .rateLimiter == nil {
143
+ record .rateLimiter = flowcontrol .NewTokenBucketRateLimiterWithClock (f .qps , f .burst , f .clock )
144
+ }
145
+
146
+ // ensure we have available rate
147
+ filter := ! record .rateLimiter .TryAccept ()
148
+
149
+ // update the cache
150
+ f .cache .Add (eventKey , record )
151
+
152
+ return filter
153
+ }
154
+
70
155
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
71
156
// It returns a tuple of the following:
72
157
// aggregateKey - key the identifies the aggregate group to bucket this event
@@ -337,18 +422,20 @@ type EventCorrelateResult struct {
337
422
// prior to interacting with the API server to record the event.
338
423
//
339
424
// The default behavior is as follows:
340
- // * No events are filtered from being recorded
341
425
// * Aggregation is performed if a similar event is recorded 10 times in a
342
426
// in a 10 minute rolling interval. A similar event is an event that varies only by
343
427
// the Event.Message field. Rather than recording the precise event, aggregation
344
428
// will create a new event whose message reports that it has combined events with
345
429
// the same reason.
346
430
// * Events are incrementally counted if the exact same event is encountered multiple
347
431
// times.
432
+ // * A source may burst 25 events about an object, but has a refill rate budget
433
+ // per object of 1 event every 5 minutes to control long-tail of spam.
348
434
func NewEventCorrelator (clock clock.Clock ) * EventCorrelator {
349
435
cacheSize := maxLruCacheEntries
436
+ spamFilter := NewEventSourceObjectSpamFilter (cacheSize , defaultSpamBurst , defaultSpamQPS , clock )
350
437
return & EventCorrelator {
351
- filterFunc : DefaultEventFilterFunc ,
438
+ filterFunc : spamFilter . Filter ,
352
439
aggregator : NewEventAggregator (
353
440
cacheSize ,
354
441
EventAggregatorByReasonFunc ,
@@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
363
450
364
451
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
365
452
func (c * EventCorrelator ) EventCorrelate (newEvent * v1.Event ) (* EventCorrelateResult , error ) {
366
- if c . filterFunc ( newEvent ) {
367
- return & EventCorrelateResult { Skip : true }, nil
453
+ if newEvent == nil {
454
+ return nil , fmt . Errorf ( "event is nil" )
368
455
}
369
456
aggregateEvent , ckey := c .aggregator .EventAggregate (newEvent )
370
457
observedEvent , patch , err := c .logger .eventObserve (aggregateEvent , ckey )
458
+ if c .filterFunc (observedEvent ) {
459
+ return & EventCorrelateResult {Skip : true }, nil
460
+ }
371
461
return & EventCorrelateResult {Event : observedEvent , Patch : patch }, err
372
462
}
373
463
0 commit comments