Conversation
3a42eda to
1ea2b72
Compare
1ea2b72 to
6b3ce90
Compare
client_test.go
Outdated
| var called atomic.Int32 | ||
|
|
||
| seedBroker.setHandler(func(req *request) (res encoderWithHeader) { | ||
| require.EqualValues(t, 3, req.body.key(), "this test sends only Metadata requests") |
There was a problem hiding this comment.
Do not sure require here. The danger here is that we could be running this on a separate goroutine from the one running the test.
Note the condition on https://pkg.go.dev/[email protected]#T.Fatal
Also consider that using a regular if block is more than adequate:
if req.body.key() != 3 /* can we get a name in here instead? */ {
t.Error("this test should only send Metadata requests")
return
}There was a problem hiding this comment.
Would you prefer that I used
if xxx {
t.Error()
}
or assert.EqualValues ? (rather than require. )
There was a problem hiding this comment.
Usually the former is more direct and clear. In this case, the if cond { t.Error() } pattern can provide a more useful test failure message than just “3 != 3”.
metadata.go
Outdated
|
|
||
| // Ongoing returns true if the refresh is ongoing. | ||
| // You need to hold the lock to call this method. | ||
| func (r *currentRefresh) Ongoing() bool { |
There was a problem hiding this comment.
This convenience function provides no convenience.
The currentRefresh lock must already be held, so we need to restrict this function from being called outside of this scope, so exporting it is not useful.
And since there is otherwise no locking, we could just export r.ongoing directly instead. Otherwise, we already can access r.ongoing ourselves.
There was a problem hiding this comment.
You're right, sorry about this.
metadata.go
Outdated
| m.next.Unlock() | ||
| current.AddTopics(topics) | ||
| var ch = current.Start() | ||
| current.Unlock() |
There was a problem hiding this comment.
Requiring each code path to properly unlock this mutex explicitly is error prone.
Perhaps, we should compose all these functions into a ch, needToRetry := m.current.refreshTopics(topics)
There was a problem hiding this comment.
❗ very nice !
I hadn't thought about this, let me do this it's going to be clearer 👍
metadata_test.go
Outdated
| // This one and the next one will be batched together because the first | ||
| // call is still ongoing when they run. | ||
| // So we will issue a single refresh call, for topic2 and topic3. | ||
| time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
I feel like we could use channels to gate execution guaranteeing that the sequence of steps taken are those that are expected, without needing to use an arbitrary wait time “just to be sure”.
first := make(chan struct{})
refresh := newSingleFlightRefresher(func(topics []string) error {
select {
case <-first: // already closed
default:
close(first)
}
// There should never be two refreshes that run concurrently,
// so it's safe to change the map with no lock.
for _, topic := range topics {
called[topic]++
}
time.Sleep(10 * time.Millisecond) // unsure how to guarantee the second and third refresh have looped.
return nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, refresh([]string{"topic1"}))
}()
wg.Add(1)
go func() {
defer wg.Done()
<-first
assert.NoError(t, refresh([]string{"topic3", "topic4"}))
}()
wg.Add(1)
go func() {
defer wg.Done()
<-first
assert.NoError(t, refresh([]string{"topic3", "topic4"}))
}()
wg.Wait()5f60a69 to
e224461
Compare
|
Thank you for your very thoughtful and very precise review @puellanivis One possibly controversial change I made is the last commit, where I used the lower-level function you made me write ( About the naming of the configuration parameter, I wanted to discuss it with you. WDYT about
? I personally liked "single flight" since I think it contains my idea of piggy backing on the previous refresh although it's a bit more complicated than usual singleflighting since we need to remember the topics. |
|
I think it’s ok to use the more internal Unsure which new configuration parameter would be appropriate. |
cupcicm
left a comment
There was a problem hiding this comment.
@puellanivis Would you prefer if the feature is called "config.Metadata.DeduplicateRequests" ?
metadata.go
Outdated
| } | ||
| if len(next.topics) > 0 { | ||
| r.AddTopics(next.topics) | ||
| next.topics = next.topics[:0] |
There was a problem hiding this comment.
OK, sorry. I am adding a clear() function to make it clearer what's happening.
@cupcicm I think SingleFlight is fine, as mentioned it is already grouped under Metadata and conveys “Use a single merged metadata request per flight (true) or parallel requests (false)” and is understood as evocative of https://pkg.go.dev/golang.org/x/sync/singleflight After some testing I will probably flip the default to true before cutting the next release so the config would just remain as an opt-out anyway – if we even choose to offer it at all |
edfcadb to
716d30d
Compare
|
OK. @puellanivis / @dnwe I have pushed an update that's exactly the same but with the signed commit, sorry. I think we are good then? I believe I have addressed all your comments? Please tell me if that's not the case. |
|
@puellanivis / @dnwe what are we missing to be able to merge this? Thanks! |
|
@epot nothing blocking, just been on holiday for most of July. I intend to merge this week and then do some post-merge testing with the main branch at scale |
|
Ok great, thanks for the update! |
|
@puellanivis, I tried to address all your comments. Manu asked me to take over his PR while he was on holidays, but I am not necessarily super familiar with everything he did so I hope I didn't mess anything up. Could you take another look please? |
|
@puellanivis I pushed again, let me know! |
|
OK, I cannot see anything to comment about. 👍 |
|
great! so what happens now? |
|
Please can you fix the DCO signoff? |
And a single flight refresher implementation that fixes this issue: IBM#3224 Signed-off-by: Manu C <[email protected]> Signed-off-by: epot <[email protected]>
It defaults to false, and when you set it to true, sarama will use a single flight refresher to avoid sending multiple metadata requests at the same time. This fixes this issue: IBM#3224 Signed-off-by: Manu C <[email protected]> Signed-off-by: epot <[email protected]>
1. remove var = 2. use assert in goroutines 3. unexport a bunch of fields 4. some other minor things. Signed-off-by: Manu C <[email protected]> Signed-off-by: epot <[email protected]>
Rather than the Refresh method. It's a lower level tests, but it allows us to write a precise test with no time.Sleep. Signed-off-by: Manu C <[email protected]> Signed-off-by: epot <[email protected]>
Signed-off-by: Manu C <[email protected]> Signed-off-by: epot <[email protected]>
Signed-off-by: epot <[email protected]>
Signed-off-by: epot <[email protected]>
a16a5d6 to
4b96b6c
Compare
|
Just did it @dnwe ! |
|
I noticed you triggered more tests and one is failing. Should I look into it or is it known as flaky? |
|
Triggered a re-run to see |
|
Looks good now :) |
dnwe
left a comment
There was a problem hiding this comment.
Thanks to all involved in both the debug, fix and review of this feature ❤️
Changes LGTM
|
Great! Thanks :) Do you know when you plan to release it? Is there some place I can subscribe to know when it happens? |
Enable #3225 by default (retaining the config option to permit an opt-out) Signed-off-by: Dominic Evans <[email protected]>
|
@dnwe sorry for the ping, did you see my message above? I am curious when this is going to be released. Thanks! |
|
@epot no worries, yes it will be released soon, should be this week – with this and the ApiVersions adherence, there's a reasonable amount of change included so I have a few different teams doing some pre-release testing before I cut the tag |
|
Note also that you can |
|
Yes we are already doing that, but a bunch of customers where asking us when it would be released. Thanks a lot for the update! |
If other goroutines ask for another metadata refresh, we are going to accumulate all requested topics into a slice and we will queue another metadata request immediately after the current one is done.
It is possible to enable this behavior by setting
config.Metadata.SingleFlight = truein your configuration.I confirmed that this fixes #3224 in our setup.