@@ -261,13 +261,29 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
261261 kip447 := false
262262 if wantCommit && ! failed {
263263 isAbortableCommitErr := func (err error ) bool {
264+ // ILLEGAL_GENERATION: rebalance began and completed
265+ // before we committed.
266+ //
267+ // REBALANCE_IN_PREGRESS: rebalance began, abort.
268+ //
269+ // COORDINATOR_NOT_AVAILABLE,
270+ // COORDINATOR_LOAD_IN_PROGRESS,
271+ // NOT_COORDINATOR: request failed too many times
272+ //
273+ // CONCURRENT_TRANSACTIONS: Kafka not harmonized,
274+ // we can just abort.
275+ //
276+ // UNKNOWN_SERVER_ERROR: technically should not happen,
277+ // but we can just abort. Redpanda returns this in
278+ // certain versions.
264279 switch {
265- case errors .Is (err , kerr .IllegalGeneration ), // rebalance begun & completed before we committed
266- errors .Is (err , kerr .RebalanceInProgress ), // in rebalance, abort & retry later
267- errors .Is (err , kerr .CoordinatorNotAvailable ), // req failed too many times (same for next two)
280+ case errors .Is (err , kerr .IllegalGeneration ),
281+ errors .Is (err , kerr .RebalanceInProgress ),
282+ errors .Is (err , kerr .CoordinatorNotAvailable ),
268283 errors .Is (err , kerr .CoordinatorLoadInProgress ),
269284 errors .Is (err , kerr .NotCoordinator ),
270- errors .Is (err , kerr .ConcurrentTransactions ): // kafka not harmonized, we can just abort or retry / eventually abort
285+ errors .Is (err , kerr .ConcurrentTransactions ),
286+ errors .Is (err , kerr .UnknownServerError ):
271287 return true
272288 }
273289 return false
@@ -376,14 +392,34 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
376392 "will_try_commit" , willTryCommit ,
377393 )
378394
379- retried := false // just in case, we use this to avoid looping
380- retryUnattempted:
395+ // We have a few potential retryable errors from EndTransaction.
396+ // OperationNotAttempted will be returned at most once.
397+ //
398+ // UnknownServerError should not be returned, but some brokers do:
399+ // technically this is fatal, but there is no downside to retrying
400+ // (even retrying a commit) and seeing if we are successful or if we
401+ // get a better error.
402+ var tries int
403+ retry:
381404 endTxnErr := s .cl .EndTransaction (ctx , TransactionEndTry (willTryCommit ))
382- if errors .Is (endTxnErr , kerr .OperationNotAttempted ) && ! retried {
383- willTryCommit = false
384- retried = true
385- s .cl .cfg .logger .Log (LogLevelInfo , "end transaction with commit not attempted; retrying as abort" )
386- goto retryUnattempted
405+ tries ++
406+ if endTxnErr != nil && tries < 10 {
407+ switch {
408+ case errors .Is (endTxnErr , kerr .OperationNotAttempted ):
409+ s .cl .cfg .logger .Log (LogLevelInfo , "end transaction with commit not attempted; retrying as abort" )
410+ willTryCommit = false
411+ goto retry
412+
413+ case errors .Is (endTxnErr , kerr .UnknownServerError ):
414+ s .cl .cfg .logger .Log (LogLevelInfo , "end transaction with commit unknown server error; retrying" )
415+ after := time .NewTimer (s .cl .cfg .retryBackoff (tries ))
416+ select {
417+ case <- after .C : // context canceled; we will see when we retry
418+ case <- s .cl .ctx .Done ():
419+ after .Stop ()
420+ }
421+ goto retry
422+ }
387423 }
388424
389425 if ! willTryCommit || endTxnErr != nil {
@@ -856,10 +892,15 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
856892 })
857893
858894 // If the returned error is still a Kafka error, this is fatal and we
859- // need to fail our producer ID we loaded above. ConcurrentTransactions
860- // could be
895+ // need to fail our producer ID we loaded above.
896+ //
897+ // UNKNOWN_SERVER_ERROR can theoretically be returned (not all brokers
898+ // do). This technically is fatal, but we do not really know whether it
899+ // is. We can just return this error and let the caller decide to
900+ // continue, if the caller does continue, we will try something and
901+ // eventually then receive our proper transactional error, if any.
861902 var ke * kerr.Error
862- if errors .As (err , & ke ) && ! ke .Retriable {
903+ if errors .As (err , & ke ) && ! ke .Retriable && ke . Code != kerr . UnknownServerError . Code {
863904 cl .failProducerID (id , epoch , err )
864905 }
865906
@@ -1054,8 +1095,14 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
10541095
10551096 // If the returned error is still a Kafka error, this is fatal and we
10561097 // need to fail our producer ID we created just above.
1098+ //
1099+ // We special case UNKNOWN_SERVER_ERROR, because we do not really know
1100+ // if this is fatal. If it is, we will catch it later on a better
1101+ // error. Some brokers send this when things fail internally, we can
1102+ // just abort our commit and see if things are still bad in
1103+ // EndTransaction.
10571104 var ke * kerr.Error
1058- if errors .As (err , & ke ) && ! ke .Retriable {
1105+ if errors .As (err , & ke ) && ! ke .Retriable && ke . Code != kerr . UnknownServerError . Code {
10591106 cl .failProducerID (id , epoch , err )
10601107 }
10611108
0 commit comments