Skip to content

logthrdest: support autoscaling partitions#855

Merged
alltilla merged 5 commits intoaxoflow:mainfrom
MrAnno:partition-stats
Dec 3, 2025
Merged

logthrdest: support autoscaling partitions#855
alltilla merged 5 commits intoaxoflow:mainfrom
MrAnno:partition-stats

Conversation

@MrAnno
Copy link
Contributor

@MrAnno MrAnno commented Nov 24, 2025

When worker-partition-key() is used to categorize messages into different batches,
the messages are - by default - hashed into workers, which prevents them from being distributed across workers
efficiently, based on load.

The new worker-partition-autoscaling(yes) option uses a 1-minute statistic to help distribute
high-traffic partitions among multiple workers, allowing each worker to maximize its batch size.

When using this autoscaling option, it is recommended to oversize the number of workers: set it higher than the
expected number of partitions.


Upper limit on the partitions table and falling back to hashing when misconfigured?

@MrAnno MrAnno marked this pull request as draft November 24, 2025 14:32
@github-actions
Copy link
Contributor

github-actions bot commented Nov 24, 2025

This Pull Request introduces config grammar changes

axoflow/65c732858e01325a29dec2576cba7c5a408421ee -> MrAnno/partition-stats

Details
--- a/destination
+++ b/destination

 axosyslog-otlp(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 bigquery(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 clickhouse(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 google-pubsub-grpc(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 http(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 kafka-c(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 loki(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 mongodb(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 opentelemetry(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 redis(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 syslog-ng-otlp(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

@alltilla alltilla self-requested a review November 25, 2025 12:15
@MrAnno MrAnno force-pushed the partition-stats branch 2 times, most recently from c2b6677 to bb9258b Compare November 26, 2025 11:49
@MrAnno MrAnno marked this pull request as ready for review November 26, 2025 23:04
MrAnno added a commit to MrAnno/axosyslog that referenced this pull request Nov 26, 2025
Signed-off-by: László Várady <[email protected]>
@MrAnno
Copy link
Contributor Author

MrAnno commented Nov 26, 2025

http(
      url("http://localhost:8080")
      method("POST")
      batch-lines(10000)
      batch-timeout(2000)
      workers(10)
      worker-partition-key("$PROGRAM")
      flush-on-worker-key-change(yes)
      worker-partition-autoscaling(yes)
);

syslogng_output_batch_size_events_bucket{le="2"} 32
syslogng_output_batch_size_events_bucket{le="8"} 3
syslogng_output_batch_size_events_bucket{le="16"} 3
syslogng_output_batch_size_events_bucket{le="32"} 1
syslogng_output_batch_size_events_bucket{le="256"} 3
syslogng_output_batch_size_events_bucket{le="512"} 2
syslogng_output_batch_size_events_bucket{le="2048"} 1
syslogng_output_batch_size_events_bucket{le="4096"} 35
syslogng_output_batch_size_events_bucket{le="8192"} 494

syslogng_memory_queue_processed_events_total{worker="7"} 52375
syslogng_memory_queue_processed_events_total{worker="8"} 52384
syslogng_memory_queue_processed_events_total{worker="9"} 52385
syslogng_memory_queue_processed_events_total{worker="0"} 52390
syslogng_memory_queue_processed_events_total{worker="1"} 52378
syslogng_memory_queue_processed_events_total{worker="2"} 52507
syslogng_memory_queue_processed_events_total{worker="3"} 1197297
syslogng_memory_queue_processed_events_total{worker="4"} 1254968
syslogng_memory_queue_processed_events_total{worker="5"} 1207284
syslogng_memory_queue_processed_events_total{worker="6"} 1159332

MrAnno added a commit to MrAnno/axosyslog that referenced this pull request Nov 26, 2025
Signed-off-by: László Várady <[email protected]>
MrAnno added a commit to MrAnno/axosyslog that referenced this pull request Nov 26, 2025
Signed-off-by: László Várady <[email protected]>
@MrAnno MrAnno requested a review from sodomelle November 26, 2025 23:40
@MrAnno
Copy link
Contributor Author

MrAnno commented Nov 27, 2025

My valgrind is broken, I'll ask someone to help me check memory stuff together.

Copy link
Member

@alltilla alltilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only optional comments, we can merge this as is if you want.


/* partition_stats_lock must be held when calling this method */
static inline gboolean
_remove_if_partition_expired(Partition **p, GHashTableIter *iter, const struct timespec *now)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick]

I think p can be a non-output variable?

part->worker_idx = current_worker_idx;
gdouble partition_ratio = _get_partition_rate_ratio(part->rate, total_rate);
gdouble partition_workers = partition_ratio * free_workers;
part->num_of_workers = (gint) (0 + floor(partition_workers));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional]

This floor() call can have nearly a whole worker as a rounding error. I know that we accumulate it and give them to one partition, but I think we have other options too, that spread the error better between the remaining partitions. I have opened a PR with my suggested algorithm, kindly consider it.

MrAnno#3

@alltilla alltilla merged commit c5d7dcd into axoflow:main Dec 3, 2025
25 of 26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants