Skip to content

Commit 627d39a

Browse files
committed
kgo: fix / improve handling deleted topics while regex consuming
* The topic name was not ever actually saved on an internal struct, so it was impossible to stop consuming a regex consumed topic * If all topics were deleted, the conditional (len(latest)) never fired and could not purge the deleted topics internally * We now avoid spam reloading metadata if consuming runs into UnknownTopicOrPartition -- we still reload, just not with an immediate trigger Closes #523.
1 parent b13e4c4 commit 627d39a

File tree

5 files changed

+101
-24
lines changed

5 files changed

+101
-24
lines changed

pkg/kgo/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ func (cl *Client) OptValues(opt any) []any {
281281
return []any{cfg.hooks}
282282
case namefn(ConcurrentTransactionsBackoff):
283283
return []any{cfg.txnBackoff}
284+
case namefn(considerMissingTopicDeletedAfter):
285+
return []any{cfg.missingTopicDelete}
284286

285287
case namefn(DefaultProduceTopic):
286288
return []any{cfg.defaultProduceTopic}

pkg/kgo/config.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type cfg struct {
121121
recordTimeout time.Duration
122122
manualFlushing bool
123123
txnBackoff time.Duration
124+
missingTopicDelete time.Duration
124125

125126
partitioner Partitioner
126127

@@ -480,8 +481,9 @@ func defaultCfg() cfg {
480481
maxBrokerWriteBytes: 100 << 20, // Kafka socket.request.max.bytes default is 100<<20
481482
maxBrokerReadBytes: 100 << 20,
482483

483-
metadataMaxAge: 5 * time.Minute,
484-
metadataMinAge: 5 * time.Second / 2,
484+
metadataMaxAge: 5 * time.Minute,
485+
metadataMinAge: 5 * time.Second,
486+
missingTopicDelete: 15 * time.Second,
485487

486488
//////////////
487489
// producer //
@@ -787,7 +789,7 @@ func MetadataMaxAge(age time.Duration) Opt {
787789
}
788790

789791
// MetadataMinAge sets the minimum time between metadata queries, overriding
790-
// the default 2.5s. You may want to raise or lower this to reduce the number of
792+
// the default 5s. You may want to raise or lower this to reduce the number of
791793
// metadata queries the client will make. Notably, if metadata detects an error
792794
// in any topic or partition, it triggers itself to update as soon as allowed.
793795
func MetadataMinAge(age time.Duration) Opt {
@@ -831,6 +833,13 @@ func ConcurrentTransactionsBackoff(backoff time.Duration) Opt {
831833
return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }}
832834
}
833835

836+
// considerMissingTopicDeletedAfter sets the amount of time a topic can be
837+
// missing from metadata responses _after_ loading it at least once before it
838+
// is considered deleted.
839+
func considerMissingTopicDeletedAfter(t time.Duration) Opt {
840+
return clientOpt{func(cfg *cfg) { cfg.missingTopicDelete = t }}
841+
}
842+
834843
////////////////////////////
835844
// PRODUCER CONFIGURATION //
836845
////////////////////////////

pkg/kgo/consumer_direct_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kgo
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sort"
78
"sync/atomic"
@@ -319,3 +320,48 @@ func TestPauseIssue489(t *testing.T) {
319320
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
320321
}
321322
}
323+
324+
func TestIssue523(t *testing.T) {
325+
t.Parallel()
326+
327+
t1, cleanup := tmpTopicPartitions(t, 1)
328+
defer cleanup()
329+
g1, gcleanup := tmpGroup(t)
330+
defer gcleanup()
331+
332+
cl, _ := NewClient(
333+
getSeedBrokers(),
334+
DefaultProduceTopic(t1),
335+
ConsumeTopics(".*"+t1+".*"),
336+
ConsumeRegex(),
337+
ConsumerGroup(g1),
338+
MetadataMinAge(100*time.Millisecond),
339+
FetchMaxWait(time.Second),
340+
KeepRetryableFetchErrors(),
341+
)
342+
defer cl.Close()
343+
344+
if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil {
345+
t.Fatal(err)
346+
}
347+
348+
cl.PollFetches(context.Background())
349+
350+
cleanup() // delete the topic
351+
352+
start := time.Now()
353+
for {
354+
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
355+
fs := cl.PollFetches(ctx)
356+
cancel()
357+
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
358+
break
359+
}
360+
if time.Since(start) > 40*time.Second { // missing topic delete is 15s by default
361+
t.Fatalf("still repeatedly requesting metadata after 20s")
362+
}
363+
if fs.Err0() != nil {
364+
time.Sleep(time.Second)
365+
}
366+
}
367+
}

pkg/kgo/metadata.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -341,13 +341,11 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
341341
}
342342
groupExternal.updateLatest(latest)
343343

344-
const maxMissTime = 15 * time.Second
345-
346344
// If we are consuming with regex and fetched all topics, the metadata
347345
// may have returned topics the consumer is not yet tracking. We ensure
348346
// that we will store the topics at the end of our metadata update.
349347
tpsConsumerLoad := tpsConsumer.load()
350-
if all && len(latest) > 0 {
348+
if all {
351349
allTopics := make([]string, 0, len(latest))
352350
for topic := range latest {
353351
allTopics = append(allTopics, topic)
@@ -356,16 +354,16 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
356354
defer tpsConsumer.storeData(tpsConsumerLoad)
357355

358356
// For regex consuming, if a topic is not returned in the
359-
// response and for at least maxMissTime from when we first
360-
// discovered it, we assume the topic has been deleted and
361-
// purge it. We allow for maxMissTime because (in testing
362-
// locally) Kafka can originally broadcast a newly created
363-
// topic exists and then fail to broadcast that info again for
364-
// a while.
357+
// response and for at least missingTopicDelete from when we
358+
// first discovered it, we assume the topic has been deleted
359+
// and purge it. We allow for missingTopicDelete because (in
360+
// testing locally) Kafka can originally broadcast a newly
361+
// created topic exists and then fail to broadcast that info
362+
// again for a while.
365363
var purgeTopics []string
366364
for topic, tps := range tpsConsumerLoad {
367365
if _, ok := latest[topic]; !ok {
368-
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime {
366+
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > cl.cfg.missingTopicDelete {
369367
purgeTopics = append(purgeTopics, td.topic)
370368
} else {
371369
retryWhy.add(topic, -1, errMissingTopic)
@@ -445,7 +443,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
445443
var bumpFail []string
446444
for _, tps := range missingProduceTopics {
447445
if all {
448-
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > maxMissTime {
446+
if td := tps.load(); td.when != 0 && time.Since(time.Unix(td.when, 0)) > cl.cfg.missingTopicDelete {
449447
bumpFail = append(bumpFail, td.topic)
450448
} else {
451449
retryWhy.add(td.topic, -1, errMissingTopic)
@@ -683,6 +681,7 @@ func (cl *Client) mergeTopicPartitions(
683681

684682
lv.loadErr = r.loadErr
685683
lv.isInternal = r.isInternal
684+
lv.topic = r.topic
686685
if lv.when == 0 {
687686
lv.when = r.when
688687
}
@@ -876,6 +875,18 @@ type kerrOrString struct {
876875
s string
877876
}
878877

878+
func (m *multiUpdateWhy) isOnly(err error) bool {
879+
if m == nil {
880+
return false
881+
}
882+
for e := range *m {
883+
if !errors.Is(err, e.k) {
884+
return false
885+
}
886+
}
887+
return true
888+
}
889+
879890
func (m *multiUpdateWhy) add(t string, p int32, err error) {
880891
if err == nil {
881892
return

pkg/kgo/source.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -680,8 +680,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
680680
reloadOffsets listOrEpochLoads
681681
preferreds cursorPreferreds
682682
allErrsStripped bool
683-
updateMeta bool
684-
updateWhy string
683+
updateWhy multiUpdateWhy
685684
handled = make(chan struct{})
686685
)
687686

@@ -692,7 +691,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
692691
// Processing the response only needs the source's nodeID and client.
693692
go func() {
694693
defer close(handled)
695-
fetch, reloadOffsets, preferreds, allErrsStripped, updateMeta, updateWhy = s.handleReqResp(br, req, resp)
694+
fetch, reloadOffsets, preferreds, allErrsStripped, updateWhy = s.handleReqResp(br, req, resp)
696695
}()
697696

698697
select {
@@ -772,8 +771,21 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
772771
s.session.bumpEpoch(resp.SessionID)
773772
}
774773

775-
if updateMeta && !reloadOffsets.loadWithSessionNow(consumerSession, updateWhy) {
776-
s.cl.triggerUpdateMetadataNow(updateWhy)
774+
// If we have a reason to update (per-partition fetch errors), and the
775+
// reason is not just unknown topic or partition, then we immediately
776+
// update metadata. We avoid updating for unknown because it _likely_
777+
// means the topic does not exist and reloading is wasteful. We only
778+
// trigger a metadata update if we have no reload offsets. Having
779+
// reload offsets *always* triggers a metadata update.
780+
if updateWhy != nil {
781+
why := updateWhy.reason("fetch had inner topic errors")
782+
if !reloadOffsets.loadWithSessionNow(consumerSession, why) {
783+
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
784+
s.cl.triggerUpdateMetadata(false, why)
785+
} else {
786+
s.cl.triggerUpdateMetadataNow(why)
787+
}
788+
}
777789
}
778790

779791
if fetch.hasErrorsOrRecords() {
@@ -808,12 +820,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
808820
reloadOffsets listOrEpochLoads,
809821
preferreds cursorPreferreds,
810822
allErrsStripped bool,
811-
updateMeta bool,
812-
why string,
823+
updateWhy multiUpdateWhy,
813824
) {
814825
f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))}
815826
var (
816-
updateWhy multiUpdateWhy
817827
debugWhyStripped multiUpdateWhy
818828
numErrsStripped int
819829
kip320 = s.cl.supportsOffsetForLeaderEpoch()
@@ -878,7 +888,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
878888

879889
fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks)
880890
if fp.Err != nil {
881-
updateMeta = true
882891
updateWhy.add(topic, partition, fp.Err)
883892
}
884893

@@ -1024,7 +1033,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
10241033
s.cl.cfg.logger.Log(LogLevelDebug, "fetch stripped partitions", "why", debugWhyStripped.reason(""))
10251034
}
10261035

1027-
return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors")
1036+
return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateWhy
10281037
}
10291038

10301039
// processRespPartition processes all records in all potentially compressed

0 commit comments

Comments
 (0)