logthrdestdrv: Add round-robin support within the same partition key#852
Merged
MrAnno merged 2 commits intoaxoflow:mainfrom Nov 19, 2025
Merged
logthrdestdrv: Add round-robin support within the same partition key#852MrAnno merged 2 commits intoaxoflow:mainfrom
MrAnno merged 2 commits intoaxoflow:mainfrom
Conversation
Signed-off-by: Balazs Scheidler <[email protected]>
This patch adds a new worker-partition-buckets() option along with the existing worker-partition-key(). This takes a template that tells the threaded fanout algorithm how many different buckets we should use for the same partition key. E.g. if the partition-key is $PROGRAM and a lot of data is generated by the same $PROGRAM, we may want to spread that to multiple threads, instead of just one. The template specified to worker-partition-buckets() should expand to an integer that specifies how many different buckets we want to use. The current algorithm in LogThreadedDestDriver is to use the entropy in the receive timestamp to spread out the messages to different workers. Signed-off-by: Balazs Scheidler <[email protected]>
Contributor
This Pull Request introduces config grammar changesaxoflow/de428f4b5aa0e1c8a3da05d174a5d7bb6250aab0 -> bazsi/logthrdestdrv-partition-rr Details--- a/destination
+++ b/destination
axosyslog-otlp(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
bigquery(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
clickhouse(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
google-pubsub-grpc(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
http(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
kafka-c(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
loki(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
mongodb(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
opentelemetry(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
redis(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
syslog-ng-otlp(
+ partition-buckets(<template-content>)
+ worker-partition-buckets(<template-content>)
)
|
MrAnno
reviewed
Nov 18, 2025
| const gchar *p = buckets_string; | ||
| gint p_len = buckets_len; | ||
|
|
||
| if (!scan_positive_int(&p, &p_len, p_len, &buckets)) |
Contributor
There was a problem hiding this comment.
What would the user choose as worker-partition-buckets()? Does it have to be evaluated for each message or can it be a constant in certain cases?
Member
Author
There was a problem hiding this comment.
Could be constant, but we have no means to propagate the value through the pipeline, other than using the incoming message.
We could potentially use manual overrides/lookup tables from the config, but I think we should rather try to size these values automatically.
MrAnno
reviewed
Nov 18, 2025
MrAnno
approved these changes
Nov 19, 2025
Contributor
MrAnno
left a comment
There was a problem hiding this comment.
I'll merge this and discuss my questions separately.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This should allow the same worker-partition-key() to use multiple worker threads. Previously the solution was to include
the round-robin value in the worker-partition-key(), however that causes the batches to be closed early (the key is changing).
With this patch, we can omit the round_robin value from the key, e.g. batching should improve. At the same time the fanout logic is moved to the C code. We should also improve our use of worker threads, as the bucket index is not meshed into the hashing, rather it is just taken as an index to the right worker.