Skip to content

Comments

Attach RateLimiter using UpdateTaskQueueConfig api configration#8058

Merged
sivagirish81 merged 25 commits intotemporalio:mainfrom
sivagirish81:TQConfigAttachRateLimit
Aug 6, 2025
Merged

Attach RateLimiter using UpdateTaskQueueConfig api configration#8058
sivagirish81 merged 25 commits intotemporalio:mainfrom
sivagirish81:TQConfigAttachRateLimit

Conversation

@sivagirish81
Copy link
Contributor

@sivagirish81 sivagirish81 commented Jul 16, 2025

What changed?

  • Implemented a new RateLimit Manager interface.
  • Populated effective rate limit field in DescribeTaskQueue api response.
  • Added a test to check whether the api set rate limit is overriding the worker set rate limit

Why?

  • Configs set via updateTaskQueueConfig api need to override the worker set rate limit.
  • Report effective RPS as the min of api/worker and system dynamic configs.
  • Report source basis the effective RPS.

How did you test it?

  • built
  • run locally and tested manually
  • added new functional test(s)

Potential risks

N/A

@sivagirish81

This comment was marked as resolved.

@sivagirish81

This comment was marked as resolved.

@sivagirish81 sivagirish81 requested a review from dnr July 22, 2025 19:11

workflowFn := func(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
TaskQueue: activityTaskQueue, // Route activities to a dedicated task queue named `activityTaskQueue`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword the comment to mention why you're doing this instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (s *TaskQueueSuite) runTestTaskQueueAPIRateLimitOverridesWorkerLimit(apiRPS float32, taskCount int, minGap time.Duration, maxGap time.Duration, activityTaskQueue string) {
// Set the burst as 1 to make sure not more than 1 task get's acknowledged at a time.
// Helps observe the backlog drain more easily.
s.OverrideDynamicConfig(dynamicconfig.MatchingMinTaskThrottlingBurstSize, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you set it to 0 to avoid the special case at the bottom of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Burst is overriden regardless of the value we set here at time of ratelimitupdate.

minTaskThrottlingBurstSize := r.config.MinTaskThrottlingBurstSize()
	if burst < minTaskThrottlingBurstSize {
		burst = minTaskThrottlingBurstSize
	}

So we will need to enforce the special case at the bottom of the test - ignore the gap between the first and the second task and check for the gap between the remaining tasks.

@sivagirish81 sivagirish81 marked this pull request as ready for review July 23, 2025 20:08
@sivagirish81 sivagirish81 requested a review from a team as a code owner July 23, 2025 20:08
@sivagirish81 sivagirish81 requested review from dnr and stephanos July 23, 2025 20:14
@sivagirish81 sivagirish81 requested a review from dnr July 24, 2025 10:02
@sivagirish81 sivagirish81 requested a review from dnr July 25, 2025 01:38
@sivagirish81 sivagirish81 mentioned this pull request Aug 1, 2025
5 tasks
Copy link
Contributor

@dnr dnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, a few small comments. the partition count stuff could be handled in the next pr. what do you think about the others?

@sivagirish81 sivagirish81 requested a review from dnr August 4, 2025 06:10

// Start the activity worker
activityWorker := worker.New(s.SdkClient(), activityTaskQueue, worker.Options{
// Setting rate limit at worker level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Setting rate limit at worker level
// Setting rate limit at worker level (this will be ignored in favor of the limit set through the api)

defer wfWorker.Stop()

// Launch workflows
for i := 0; i < taskCount; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := 0; i < taskCount; i++ {
for i := range taskCount {

@sivagirish81 sivagirish81 merged commit 25f5d78 into temporalio:main Aug 6, 2025
95 of 97 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants