Add WakeupEvents support to perf buffers#524
Add WakeupEvents support to perf buffers#524brycekahle wants to merge 1 commit intocilium:masterfrom
Conversation
| func createPerfEvent(cpu, watermark int) (int, error) { | ||
| if watermark == 0 { | ||
| watermark = 1 | ||
| func createPerfEvent(cpu, wakeupBytes int, wakeupEvents int) (int, error) { |
There was a problem hiding this comment.
This should return an error if both bytes and events are set.
There was a problem hiding this comment.
This should probably take a ReaderOptions instead of more ints.
|
|
||
| // TriggerAll forces all rings to be queued up reading on the next call to Read. | ||
| // This is useful to forcefully read pending events that haven't met the threshold of WakeupEvents yet. | ||
| func (pr *Reader) TriggerAll() { |
There was a problem hiding this comment.
How do you want to use this? I've played with this idea in the past, but it was never clear to me how to make this square with Read() semantics. You can't avoid blocking in Read to get all the remaining samples without knowing how many samples there are.
There was a problem hiding this comment.
That's a good point. We might need a new function that reads each ring until errEOR and returns all those records at once? That or a way to opt-out of the call to poller.Wait once len(pr.epollRings) == 0.
There was a problem hiding this comment.
How do you interrupt that errEOR function though? It's turtles all the way down. Hence my question what your use case is, maybe there is some other way to solve it.
There was a problem hiding this comment.
No need for interruption because it isn't blocking. It would essentially be this loop:
for {
if len(pr.epollRings) == 0 {
return Record{}, io.EOF
}
// Start at the last available event. The order in which we
// process them doesn't matter, and starting at the back allows
// resizing epollRings to keep track of processed rings.
record, err := readRecordFromRing(pr.epollRings[len(pr.epollRings)-1])
if err == errEOR {
// We've emptied the current ring buffer, process
// the next one.
pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
continue
}
return record, err
}
Our use case is using wakeup_events to do a sort of batching. We observe a lot of CPU being used to wakeup for every event, so we want to wakeup every X. We have a synchronization point in time where we need to read all queued events, even if they haven't triggered the batch size X.
There was a problem hiding this comment.
We have a synchronization point in time where we need to read all queued events, even if they haven't triggered the batch size X.
But you still keep reading afterwards, right? That's easier to solve than flush and read remaining.
There was a problem hiding this comment.
I'd suggest adding e.g. ReadAll(maxRecords int, timeoutMsec int, receive func(record Record)) error, which would first poll all rings and then if there's something to read in any of them, it'll loop through all of the rings until they're empty.
Though... the tricky part is probably figuring out all the different ways this might need to be parameterized, and for some high-perf use-cases we might want to have thread-per-ring (LockOSThread'd and SetAffinity'd to the same cpu as the ring). So perhaps providing the low-level primitives to build this in any way you want plus few sane wrappers would be the way to go?
There was a problem hiding this comment.
First, we should all be able to understand where the current bottlenecks lie, add benchmarks to the package and make incremental improvements from there. There are likely more than a few perf gains still on the table that would benefit all existing users, which is much preferred to adding more API to accommodate higher throughput. To me, this TriggerAll() approach feels like a hack that won't benefit the large majority. The current API is relatively straightforward, we should try to preserve that.
@brycekahle Could you share what exactly you're seeing in those CPU profiles, along with maybe some repro steps or benches? Without this, we can't reason about the problem to come up with a good overall solution.
There was a problem hiding this comment.
The CPU usage isn't necessarily the fault of this library, but rather what we do with those events, which is why we batch them to reduce the frequency. Doing this batching in eBPF ends up costing us stack size and larger maps to store the batched data. Being able to do this batching directly in the perf buffer would be a huge improvement for us.
Let me think about the flush use case, to see if a better design shakes out.
There was a problem hiding this comment.
I'm happy with WakeupEvents as it stands if that is of any use to you on its own?
| // The number of events required in any per CPU buffer before | ||
| // Read will process data. This is mutually exclusive with Watermark. | ||
| // The default is zero, which means Watermark will take precedence. | ||
| WakeupEvents int |
There was a problem hiding this comment.
... This is mutually exclusive with Watermark.
// The default is zero, which means Watermark will take precedence.
It would be much nicer if the caller was actually informed of this configuration conflict. There's no way to introspect how the reader is ultimately configured, so the caller won't be aware of one parameter being chosen over the other.
|
|
||
| func TestCreatePerfEvent(t *testing.T) { | ||
| fd, err := createPerfEvent(0, 1) | ||
| fd, err := createPerfEvent(0, 1, 0) |
There was a problem hiding this comment.
I would probably pass ReaderOptions here, this call site is quickly becoming incrutable. :)
| func createPerfEvent(cpu, watermark int) (int, error) { | ||
| if watermark == 0 { | ||
| watermark = 1 | ||
| func createPerfEvent(cpu, wakeupBytes int, wakeupEvents int) (int, error) { |
There was a problem hiding this comment.
This should probably take a ReaderOptions instead of more ints.
|
|
||
| func newPerfEventRing(cpu, perCPUBuffer, watermark int) (*perfEventRing, error) { | ||
| if watermark >= perCPUBuffer { | ||
| func newPerfEventRing(cpu, perCPUBuffer, wakeupBytes int, wakeupEvents int) (*perfEventRing, error) { |
There was a problem hiding this comment.
Idem, ReaderOptions would be nicer.
|
Closing for now. Feel free to reopen. |
wakeup_eventsas an alternative towakeup_watermark.TriggerAllfunction toperf.Readerto allow reading of queued events that haven't triggered the threshold ofwakeup_eventsyet.Note: This depends on
ReadTimeoutfrom #523