Skip to content

fix: stuck on the batch with zero records with long gap#3150

Closed
Sergey-Belyakov wants to merge 1 commit intoIBM:mainfrom
Sergey-Belyakov:fix_received_batch_with_zero_records
Closed

fix: stuck on the batch with zero records with long gap#3150
Sergey-Belyakov wants to merge 1 commit intoIBM:mainfrom
Sergey-Belyakov:fix_received_batch_with_zero_records

Conversation

@Sergey-Belyakov
Copy link
Copy Markdown

Hi! We found a bug. If a repeated fetch with an increased offset has returned the void again, then fetching is constantly trying to request a batch with the same offset. It is always necessary to increase the offset in such cases.

@Sergey-Belyakov Sergey-Belyakov force-pushed the fix_received_batch_with_zero_records branch from 252a40b to b2a6cdf Compare April 18, 2025 11:59
@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented May 13, 2025

@Sergey-Belyakov thanks for the PR, this is interesting!

Will take a look to see what the Java client behaviour is in this scenario to ensure we align

@kgatv2
Copy link
Copy Markdown

kgatv2 commented Jul 4, 2025

This bug has hit us multiple times in production - can it receive a review? (PR works for us when we apply it locally)

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 7, 2025

@kgatv2 what is your backend cluster? We just need to be clear why we think the FetchResponse that isn't end-of-topic, has no error and no records – to ensure we're not just papering over some corruption in a third-party broker implementation and it is a genuine Apache Kafka broker behaviour (perhaps due to aborted transactions?) that causes these responses.

I guess I'm nervous about tossing in offset++ without understanding why. Corroborating links to other client implementations of the behaviour in apache/kafka, confluentinc/librdkafka, twmb/franz-go would also help give weight to this PR too

@kgatv2
Copy link
Copy Markdown

kgatv2 commented Jul 7, 2025

We are hosting our kafka topics on Aiven - https://aiven.io/kafka. Using 4 partitions for the topic that got "corrupted".

@sterligov
Copy link
Copy Markdown
Contributor

Hi, sorry for my English. I deleted my previous comment because I wasn't completely sure, but now I think I'm sure of what I'm saying.

It looks like we've encountered issue KAFKA-5443

I think the problem is how exactly LastRecordsBatchOffset is defined.

Let's see how it is defined:

b.LastRecordsBatchOffset, err = records.recordsOffset()

Now let's on recordsOffset function

sarama/records.go

Lines 186 to 199 in aa1a2c5

func (r *Records) recordsOffset() (*int64, error) {
switch r.recordsType {
case unknownRecords:
return nil, nil
case legacyRecords:
return nil, nil
case defaultRecords:
if r.RecordBatch == nil {
return nil, nil
}
return &r.RecordBatch.FirstOffset, nil
}
return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
}

That is, in LastRecordsBatchOffset we store first offset of the records batch.
But when requesting, in order not to hang, we should request the last offset of the records batch + 1, that is FirstOffset + LastOffsetDelta + 1

This is exactly the implementation in other libraries

Let's see how it's done in the franz-go library:

https://github.com/twmb/franz-go/blob/669b18eeee83408f04b848f0a5069dcd86153413/pkg/kgo/source.go#L1586

https://github.com/twmb/franz-go/blob/669b18eeee83408f04b848f0a5069dcd86153413/pkg/kgo/source.go#L1643-L1658

And now let see how it's done in the Java library:

https://github.com/apache/kafka/blob/4b607616c764e7654fac200cdd260e74ef29fca3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java#L194

https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java#L127-L131

I think the recordsOffset function should look something like this:

func (r *Records) recordsOffset() (*int64, error) {
	switch r.recordsType {
	case unknownRecords:
		return nil, nil
	case legacyRecords:
		return nil, nil
	case defaultRecords:
		if r.RecordBatch == nil {
			return nil, nil
		}
		lastOffset := r.RecordBatch.LastOffset()
		return &lastOffset, nil
	}
	return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
}

I just got this bug on our cluster.

Screenshot 2025-07-12 at 10 44 52

FirstOffset = 1273344
LastOffsetDelta = 96

We keep sending the request with LastRecordsBatchOffset + 1 = 1273345 over and over, and we keep getting the same result every time. But if we modify the recordsOffset function the way I described above, we no longer get stuck at that part of the code.

@dnwe What do you think about this?

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 12, 2025

@sterligov thanks this is good analysis. I think you’re probably right and the original fix attempted under #2057 was put in the wrong place and should have used your delta increment rather than a +1

Are you happy to draft a PR up with your proposed fix (removing the old #2057 one) so we can run it through the FV?

@sterligov
Copy link
Copy Markdown
Contributor

sterligov commented Jul 20, 2025

@dnwe Hi, I tried to solve this problem in PR 3221.
Please pay attention to it when you have time.

dnwe added a commit that referenced this pull request Jul 31, 2025
This pull request addresses the known issue described in
[KAFKA-5443](https://issues.apache.org/jira/browse/KAFKA-5443), which
was initially analyzed in
[#2053](#2053). A prior attempt to fix
it was made in [#2057](#2057), but the
fix was applied incorrectly and did not fully resolve the root cause.

I’ve provided a more detailed explanation of the underlying issue and
the reasoning behind this fix in [this
comment](#3150 (comment)).

---------

Signed-off-by: Sterligov Denis <[email protected]>
Signed-off-by: Dominic Evans <[email protected]>
Co-authored-by: Sterligov Denis <[email protected]>
Co-authored-by: Dominic Evans <[email protected]>
@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented Jul 31, 2025

closing in favour of merged #3221 – thanks for raising the issue!

@dnwe dnwe closed this Jul 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants