Skip to content

Commit 3ecaff2

Browse files
committed
kgo txn: handle UNKNOWN_SERVER_ERROR more widely
This error is technically not retryable, but in the context of transactions, at worst, we will eventually see a better more direct error. Redpanda returns this error a bit right now (although this is being reduced with 22.3), but we may as well be more resilient anyway.
1 parent eb6e3b5 commit 3ecaff2

File tree

1 file changed

+62
-15
lines changed

1 file changed

+62
-15
lines changed

pkg/kgo/txn.go

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,29 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
261261
kip447 := false
262262
if wantCommit && !failed {
263263
isAbortableCommitErr := func(err error) bool {
264+
// ILLEGAL_GENERATION: rebalance began and completed
265+
// before we committed.
266+
//
267+
// REBALANCE_IN_PREGRESS: rebalance began, abort.
268+
//
269+
// COORDINATOR_NOT_AVAILABLE,
270+
// COORDINATOR_LOAD_IN_PROGRESS,
271+
// NOT_COORDINATOR: request failed too many times
272+
//
273+
// CONCURRENT_TRANSACTIONS: Kafka not harmonized,
274+
// we can just abort.
275+
//
276+
// UNKNOWN_SERVER_ERROR: technically should not happen,
277+
// but we can just abort. Redpanda returns this in
278+
// certain versions.
264279
switch {
265-
case errors.Is(err, kerr.IllegalGeneration), // rebalance begun & completed before we committed
266-
errors.Is(err, kerr.RebalanceInProgress), // in rebalance, abort & retry later
267-
errors.Is(err, kerr.CoordinatorNotAvailable), // req failed too many times (same for next two)
280+
case errors.Is(err, kerr.IllegalGeneration),
281+
errors.Is(err, kerr.RebalanceInProgress),
282+
errors.Is(err, kerr.CoordinatorNotAvailable),
268283
errors.Is(err, kerr.CoordinatorLoadInProgress),
269284
errors.Is(err, kerr.NotCoordinator),
270-
errors.Is(err, kerr.ConcurrentTransactions): // kafka not harmonized, we can just abort or retry / eventually abort
285+
errors.Is(err, kerr.ConcurrentTransactions),
286+
errors.Is(err, kerr.UnknownServerError):
271287
return true
272288
}
273289
return false
@@ -376,14 +392,34 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
376392
"will_try_commit", willTryCommit,
377393
)
378394

379-
retried := false // just in case, we use this to avoid looping
380-
retryUnattempted:
395+
// We have a few potential retryable errors from EndTransaction.
396+
// OperationNotAttempted will be returned at most once.
397+
//
398+
// UnknownServerError should not be returned, but some brokers do:
399+
// technically this is fatal, but there is no downside to retrying
400+
// (even retrying a commit) and seeing if we are successful or if we
401+
// get a better error.
402+
var tries int
403+
retry:
381404
endTxnErr := s.cl.EndTransaction(ctx, TransactionEndTry(willTryCommit))
382-
if errors.Is(endTxnErr, kerr.OperationNotAttempted) && !retried {
383-
willTryCommit = false
384-
retried = true
385-
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort")
386-
goto retryUnattempted
405+
tries++
406+
if endTxnErr != nil && tries < 10 {
407+
switch {
408+
case errors.Is(endTxnErr, kerr.OperationNotAttempted):
409+
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort")
410+
willTryCommit = false
411+
goto retry
412+
413+
case errors.Is(endTxnErr, kerr.UnknownServerError):
414+
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
415+
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
416+
select {
417+
case <-after.C: // context canceled; we will see when we retry
418+
case <-s.cl.ctx.Done():
419+
after.Stop()
420+
}
421+
goto retry
422+
}
387423
}
388424

389425
if !willTryCommit || endTxnErr != nil {
@@ -856,10 +892,15 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
856892
})
857893

858894
// If the returned error is still a Kafka error, this is fatal and we
859-
// need to fail our producer ID we loaded above. ConcurrentTransactions
860-
// could be
895+
// need to fail our producer ID we loaded above.
896+
//
897+
// UNKNOWN_SERVER_ERROR can theoretically be returned (not all brokers
898+
// do). This technically is fatal, but we do not really know whether it
899+
// is. We can just return this error and let the caller decide to
900+
// continue, if the caller does continue, we will try something and
901+
// eventually then receive our proper transactional error, if any.
861902
var ke *kerr.Error
862-
if errors.As(err, &ke) && !ke.Retriable {
903+
if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code {
863904
cl.failProducerID(id, epoch, err)
864905
}
865906

@@ -1054,8 +1095,14 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
10541095

10551096
// If the returned error is still a Kafka error, this is fatal and we
10561097
// need to fail our producer ID we created just above.
1098+
//
1099+
// We special case UNKNOWN_SERVER_ERROR, because we do not really know
1100+
// if this is fatal. If it is, we will catch it later on a better
1101+
// error. Some brokers send this when things fail internally, we can
1102+
// just abort our commit and see if things are still bad in
1103+
// EndTransaction.
10571104
var ke *kerr.Error
1058-
if errors.As(err, &ke) && !ke.Retriable {
1105+
if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code {
10591106
cl.failProducerID(id, epoch, err)
10601107
}
10611108

0 commit comments

Comments
 (0)