Skip to content

logthrdestdrv: Add round-robin support within the same partition key#852

Merged
MrAnno merged 2 commits intoaxoflow:mainfrom
bazsi:logthrdestdrv-partition-rr
Nov 19, 2025
Merged

logthrdestdrv: Add round-robin support within the same partition key#852
MrAnno merged 2 commits intoaxoflow:mainfrom
bazsi:logthrdestdrv-partition-rr

Conversation

@bazsi
Copy link
Member

@bazsi bazsi commented Nov 16, 2025

This should allow the same worker-partition-key() to use multiple worker threads. Previously the solution was to include
the round-robin value in the worker-partition-key(), however that causes the batches to be closed early (the key is changing).

With this patch, we can omit the round_robin value from the key, e.g. batching should improve. At the same time the fanout logic is moved to the C code. We should also improve our use of worker threads, as the bucket index is not meshed into the hashing, rather it is just taken as an index to the right worker.

This patch adds a new worker-partition-buckets() option along with the
existing worker-partition-key(). This takes a template that tells
the threaded fanout algorithm how many different buckets we should use
for the same partition key.

E.g. if the partition-key is $PROGRAM and a lot of data is generated by
the same $PROGRAM, we may want to spread that to multiple threads, instead
of just one.

The template specified to worker-partition-buckets() should expand to an
integer that specifies how many different buckets we want to use.

The current algorithm in LogThreadedDestDriver is to use the entropy
in the receive timestamp to spread out the messages to different workers.

Signed-off-by: Balazs Scheidler <[email protected]>
@github-actions
Copy link
Contributor

This Pull Request introduces config grammar changes

axoflow/de428f4b5aa0e1c8a3da05d174a5d7bb6250aab0 -> bazsi/logthrdestdrv-partition-rr

Details
--- a/destination
+++ b/destination

 axosyslog-otlp(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 bigquery(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 clickhouse(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 google-pubsub-grpc(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 http(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 kafka-c(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 loki(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 mongodb(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 opentelemetry(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 redis(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

 syslog-ng-otlp(
+    partition-buckets(<template-content>)
+    worker-partition-buckets(<template-content>)
 )

const gchar *p = buckets_string;
gint p_len = buckets_len;

if (!scan_positive_int(&p, &p_len, p_len, &buckets))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the user choose as worker-partition-buckets()? Does it have to be evaluated for each message or can it be a constant in certain cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be constant, but we have no means to propagate the value through the pipeline, other than using the incoming message.

We could potentially use manual overrides/lookup tables from the config, but I think we should rather try to size these values automatically.

Copy link
Contributor

@MrAnno MrAnno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need a news entry if the meaning behind worker-partition-buckets() is finalized.

Copy link
Contributor

@MrAnno MrAnno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this and discuss my questions separately.

@MrAnno MrAnno merged commit 2afc405 into axoflow:main Nov 19, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants