Skip to content

feat: option to group metadata refreshes so only one is in-flight at a time#3225

Merged
dnwe merged 7 commits intoIBM:mainfrom
warpstreamlabs:cupcicm/singleflight_metadata_requests
Jul 31, 2025
Merged

feat: option to group metadata refreshes so only one is in-flight at a time#3225
dnwe merged 7 commits intoIBM:mainfrom
warpstreamlabs:cupcicm/singleflight_metadata_requests

Conversation

@cupcicm
Copy link
Copy Markdown
Contributor

@cupcicm cupcicm commented Jul 16, 2025

If other goroutines ask for another metadata refresh, we are going to accumulate all requested topics into a slice and we will queue another metadata request immediately after the current one is done.
It is possible to enable this behavior by setting config.Metadata.SingleFlight = true in your configuration.

I confirmed that this fixes #3224 in our setup.

@cupcicm cupcicm mentioned this pull request Jul 16, 2025
@airlock-warpstreamlabs airlock-warpstreamlabs bot force-pushed the cupcicm/singleflight_metadata_requests branch 2 times, most recently from 3a42eda to 1ea2b72 Compare July 16, 2025 14:37
@airlock-warpstreamlabs airlock-warpstreamlabs bot force-pushed the cupcicm/singleflight_metadata_requests branch from 1ea2b72 to 6b3ce90 Compare July 16, 2025 14:53
client_test.go Outdated
var called atomic.Int32

seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
require.EqualValues(t, 3, req.body.key(), "this test sends only Metadata requests")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not sure require here. The danger here is that we could be running this on a separate goroutine from the one running the test.

Note the condition on https://pkg.go.dev/[email protected]#T.Fatal

Also consider that using a regular if block is more than adequate:

if req.body.key() != 3 /* can we get a name in here instead? */ {
	t.Error("this test should only send Metadata requests")
	return
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you prefer that I used

if xxx {
  t.Error()
}

or assert.EqualValues ? (rather than require. )

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the former is more direct and clear. In this case, the if cond { t.Error() } pattern can provide a more useful test failure message than just “3 != 3”.

metadata.go Outdated

// Ongoing returns true if the refresh is ongoing.
// You need to hold the lock to call this method.
func (r *currentRefresh) Ongoing() bool {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This convenience function provides no convenience.

The currentRefresh lock must already be held, so we need to restrict this function from being called outside of this scope, so exporting it is not useful.

And since there is otherwise no locking, we could just export r.ongoing directly instead. Otherwise, we already can access r.ongoing ourselves.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, sorry about this.

metadata.go Outdated
m.next.Unlock()
current.AddTopics(topics)
var ch = current.Start()
current.Unlock()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring each code path to properly unlock this mutex explicitly is error prone.

Perhaps, we should compose all these functions into a ch, needToRetry := m.current.refreshTopics(topics)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❗ very nice !
I hadn't thought about this, let me do this it's going to be clearer 👍

metadata_test.go Outdated
// This one and the next one will be batched together because the first
// call is still ongoing when they run.
// So we will issue a single refresh call, for topic2 and topic3.
time.Sleep(10 * time.Millisecond)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we could use channels to gate execution guaranteeing that the sequence of steps taken are those that are expected, without needing to use an arbitrary wait time “just to be sure”.

first := make(chan struct{})
refresh := newSingleFlightRefresher(func(topics []string) error {
	select {
	case <-first: // already closed
	default:
		close(first)
	}
	// There should never be two refreshes that run concurrently,
	// so it's safe to change the map with no lock.
	for _, topic := range topics {
		called[topic]++
	}
	time.Sleep(10 * time.Millisecond) // unsure how to guarantee the second and third refresh have looped.
	return nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
	defer wg.Done()
	assert.NoError(t, refresh([]string{"topic1"}))
}()
wg.Add(1)
go func() {
	defer wg.Done()
	<-first
	assert.NoError(t, refresh([]string{"topic3", "topic4"}))
}()
wg.Add(1)
go func() {
	defer wg.Done()
	<-first
	assert.NoError(t, refresh([]string{"topic3", "topic4"}))
}()
wg.Wait()

@airlock-warpstreamlabs airlock-warpstreamlabs bot force-pushed the cupcicm/singleflight_metadata_requests branch 2 times, most recently from 5f60a69 to e224461 Compare July 18, 2025 13:15
@cupcicm
Copy link
Copy Markdown
Contributor Author

cupcicm commented Jul 18, 2025

Thank you for your very thoughtful and very precise review @puellanivis
I have (I think) addressed all your comments except the naming one.

One possibly controversial change I made is the last commit, where I used the lower-level function you made me write (refreshOrQueue) in the test, which allows to resolve the race condition of "this request has been successfully enqueued but I haven't waited for the result to become available yet. If we allow ourselves to test this primitive (rather than the full Refresh()) we can test with no time.Sleep. If we don't, ... it's complicated (I think it's impossible because there is no point where we can assert we have enqueued the request without having to wait for the result).

About the naming of the configuration parameter, I wanted to discuss it with you. WDYT about

  1. config.Metadata.BatchRequests
  2. config.Metadata.DeduplicateRequests

? I personally liked "single flight" since I think it contains my idea of piggy backing on the previous refresh although it's a bit more complicated than usual singleflighting since we need to remember the topics.

@puellanivis
Copy link
Copy Markdown
Collaborator

I think it’s ok to use the more internal refreshOrQueue to remove race conditions, as long as the exported behavior relying upon that internal function is reasonably transparent. (Simple enough that we can just look at the code and see if it is valid or not.)

Unsure which new configuration parameter would be appropriate. config.Metadata.SingleFlight might not even be so bad, since my concern was mostly on just knowing that it applied to metadata requests only. 🤔 I probably would prefer less “deduplicate” as we are avoiding duplicate metadata refreshes, but we’re also merging in all outstanding topics together, not just removing duplicate ones. So maybe something to mean “batching” is not such a bad idea.

Copy link
Copy Markdown
Contributor Author

@cupcicm cupcicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@puellanivis Would you prefer if the feature is called "config.Metadata.DeduplicateRequests" ?

metadata.go Outdated
}
if len(next.topics) > 0 {
r.AddTopics(next.topics)
next.topics = next.topics[:0]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sorry. I am adding a clear() function to make it clearer what's happening.

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 21, 2025

@puellanivis Would you prefer if the feature is called "config.Metadata.DeduplicateRequests" ?

@cupcicm I think SingleFlight is fine, as mentioned it is already grouped under Metadata and conveys “Use a single merged metadata request per flight (true) or parallel requests (false)” and is understood as evocative of https://pkg.go.dev/golang.org/x/sync/singleflight

After some testing I will probably flip the default to true before cutting the next release so the config would just remain as an opt-out anyway – if we even choose to offer it at all

@airlock-warpstreamlabs airlock-warpstreamlabs bot force-pushed the cupcicm/singleflight_metadata_requests branch from edfcadb to 716d30d Compare July 22, 2025 09:49
@cupcicm
Copy link
Copy Markdown
Contributor Author

cupcicm commented Jul 22, 2025

OK. @puellanivis / @dnwe I have pushed an update that's exactly the same but with the signed commit, sorry.

I think we are good then? I believe I have addressed all your comments? Please tell me if that's not the case.

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 28, 2025

@puellanivis / @dnwe what are we missing to be able to merge this? Thanks!

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 28, 2025

@epot nothing blocking, just been on holiday for most of July. I intend to merge this week and then do some post-merge testing with the main branch at scale

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 28, 2025

Ok great, thanks for the update!

Copy link
Copy Markdown
Collaborator

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for two minor concerns in the testing, everything else is just polishing.

PS: This means, once the test concerns are addressed, there’s no need to wait on any rereview.

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 28, 2025

@puellanivis, I tried to address all your comments. Manu asked me to take over his PR while he was on holidays, but I am not necessarily super familiar with everything he did so I hope I didn't mess anything up. Could you take another look please?

Copy link
Copy Markdown
Collaborator

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a two things.

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 29, 2025

@puellanivis I pushed again, let me know!

@puellanivis
Copy link
Copy Markdown
Collaborator

OK, I cannot see anything to comment about. 👍

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 29, 2025

great! so what happens now?

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 29, 2025

Please can you fix the DCO signoff?

cupcicm added 5 commits July 30, 2025 09:04
And a single flight refresher implementation that fixes this
issue: IBM#3224

Signed-off-by: Manu C <[email protected]>
Signed-off-by: epot <[email protected]>
It defaults to false, and when you set it to true, sarama
will use a single flight refresher to avoid sending multiple
metadata requests at the same time.

This fixes this issue: IBM#3224

Signed-off-by: Manu C <[email protected]>
Signed-off-by: epot <[email protected]>
1. remove var =
2. use assert in goroutines
3. unexport a bunch of fields
4. some other minor things.

Signed-off-by: Manu C <[email protected]>
Signed-off-by: epot <[email protected]>
Rather than the Refresh method. It's a lower level tests, but it
allows us to write a precise test with no time.Sleep.

Signed-off-by: Manu C <[email protected]>
Signed-off-by: epot <[email protected]>
Signed-off-by: Manu C <[email protected]>
Signed-off-by: epot <[email protected]>
epot added 2 commits July 30, 2025 09:04
@airlock-warpstreamlabs airlock-warpstreamlabs bot force-pushed the cupcicm/singleflight_metadata_requests branch from a16a5d6 to 4b96b6c Compare July 30, 2025 07:04
@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 30, 2025

Just did it @dnwe !

@dnwe dnwe added the feat label Jul 30, 2025
@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 31, 2025

I noticed you triggered more tests and one is failing. Should I look into it or is it known as flaky?

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 31, 2025

Triggered a re-run to see

@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 31, 2025

Looks good now :)

@dnwe dnwe changed the title Adds a component that knows how to run a single concurrent metadata refresh feat: option to group metadata refreshes so only one is in-flight at a time Jul 31, 2025
Copy link
Copy Markdown
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to all involved in both the debug, fix and review of this feature ❤️

Changes LGTM

@dnwe dnwe merged commit e566ac6 into IBM:main Jul 31, 2025
21 of 22 checks passed
@epot
Copy link
Copy Markdown
Contributor

epot commented Jul 31, 2025

Great! Thanks :) Do you know when you plan to release it? Is there some place I can subscribe to know when it happens?

dnwe added a commit that referenced this pull request Aug 6, 2025
Enable #3225 by default (retaining the
config option to permit an opt-out)

Signed-off-by: Dominic Evans <[email protected]>
@epot
Copy link
Copy Markdown
Contributor

epot commented Aug 12, 2025

@dnwe sorry for the ping, did you see my message above? I am curious when this is going to be released. Thanks!

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Aug 12, 2025

@epot no worries, yes it will be released soon, should be this week – with this and the ApiVersions adherence, there's a reasonable amount of change included so I have a few different teams doing some pre-release testing before I cut the tag

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Aug 12, 2025

Note also that you can go get github.com/IBM/sarama@main at any time to use the latest main branch version in your app's go.mod/go.sum so you can start using this level of code right away too

@epot
Copy link
Copy Markdown
Contributor

epot commented Aug 12, 2025

Yes we are already doing that, but a bunch of customers where asking us when it would be released. Thanks a lot for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Metadata request burst

5 participants