@@ -1325,6 +1325,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
13251325 * kmsg.ListGroupsRequest , // key 16
13261326 * kmsg.DeleteRecordsRequest , // key 21
13271327 * kmsg.OffsetForLeaderEpochRequest , // key 23
1328+ * kmsg.AddPartitionsToTxnRequest , // key 24
1329+ * kmsg.WriteTxnMarkersRequest , // key 27
13281330 * kmsg.DescribeConfigsRequest , // key 32
13291331 * kmsg.AlterConfigsRequest , // key 33
13301332 * kmsg.AlterReplicaLogDirsRequest , // key 34
@@ -1775,8 +1777,6 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
17751777 // names, we delete no coordinator.
17761778 coordinator , resp , err := cl .handleReqWithCoordinator (ctx , func () (* broker , error ) { return cl .broker (), nil }, coordinatorTypeTxn , "" , req )
17771779 return shard (coordinator , req , resp , err )
1778- case * kmsg.AddPartitionsToTxnRequest :
1779- return cl .handleCoordinatorReqSimple (ctx , coordinatorTypeTxn , t .TransactionalID , req )
17801780 case * kmsg.AddOffsetsToTxnRequest :
17811781 return cl .handleCoordinatorReqSimple (ctx , coordinatorTypeTxn , t .TransactionalID , req )
17821782 case * kmsg.EndTxnRequest :
@@ -1840,10 +1840,6 @@ func (cl *Client) handleReqWithCoordinator(
18401840 // TXN
18411841 case * kmsg.InitProducerIDResponse :
18421842 code = t .ErrorCode
1843- case * kmsg.AddPartitionsToTxnResponse :
1844- if len (t .Topics ) > 0 && len (t .Topics [0 ].Partitions ) > 0 {
1845- code = t .Topics [0 ].Partitions [0 ].ErrorCode
1846- }
18471843 case * kmsg.AddOffsetsToTxnResponse :
18481844 code = t .ErrorCode
18491845 case * kmsg.EndTxnResponse :
@@ -2080,6 +2076,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
20802076 sharder = & deleteRecordsSharder {cl }
20812077 case * kmsg.OffsetForLeaderEpochRequest :
20822078 sharder = & offsetForLeaderEpochSharder {cl }
2079+ case * kmsg.AddPartitionsToTxnRequest :
2080+ sharder = & addPartitionsToTxnSharder {cl }
20832081 case * kmsg.WriteTxnMarkersRequest :
20842082 sharder = & writeTxnMarkersSharder {cl }
20852083 case * kmsg.DescribeConfigsRequest :
@@ -2767,9 +2765,16 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
27672765 broker : id ,
27682766 })
27692767 }
2768+ } else if len (req .Groups ) == 1 {
2769+ single := offsetFetchGroupToReq (req .RequireStable , req .Groups [0 ])
2770+ single .Groups = req .Groups
2771+ issues = append (issues , issueShard {
2772+ req : single ,
2773+ broker : id ,
2774+ })
27702775 } else {
27712776 issues = append (issues , issueShard {
2772- req : & pinReq {Request : req , pinMin : true , min : 8 },
2777+ req : & pinReq {Request : req , pinMin : len ( req . Groups ) > 1 , min : 8 },
27732778 broker : id ,
27742779 })
27752780 }
@@ -2791,7 +2796,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
27912796}
27922797
27932798func (cl * offsetFetchSharder ) onResp (kreq kmsg.Request , kresp kmsg.Response ) error {
2794- req := kreq .(* kmsg.OffsetFetchRequest ) // we always issue pinned requests
2799+ req := kreq .(* kmsg.OffsetFetchRequest )
27952800 resp := kresp .(* kmsg.OffsetFetchResponse )
27962801
27972802 switch len (resp .Groups ) {
@@ -2876,9 +2881,16 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
28762881 for key := range uniq {
28772882 req .CoordinatorKeys = append (req .CoordinatorKeys , key )
28782883 }
2884+ if len (req .CoordinatorKeys ) == 1 {
2885+ req .CoordinatorKey = req .CoordinatorKeys [0 ]
2886+ }
28792887
28802888 splitReq := errors .Is (lastErr , errBrokerTooOld )
28812889 if ! splitReq {
2890+ // With only one key, we do not need to split nor pin this.
2891+ if len (req .CoordinatorKeys ) <= 1 {
2892+ return []issueShard {{req : req , any : true }}, false , nil
2893+ }
28822894 return []issueShard {{
28832895 req : & pinReq {Request : req , pinMin : true , min : 4 },
28842896 any : true ,
@@ -2899,7 +2911,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
28992911}
29002912
29012913func (* findCoordinatorSharder ) onResp (kreq kmsg.Request , kresp kmsg.Response ) error {
2902- req := kreq .(* kmsg.FindCoordinatorRequest ) // we always issue pinned requests
2914+ req := kreq .(* kmsg.FindCoordinatorRequest )
29032915 resp := kresp .(* kmsg.FindCoordinatorResponse )
29042916
29052917 switch len (resp .Coordinators ) {
@@ -3293,6 +3305,195 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
32933305 return merged , firstErr
32943306}
32953307
3308+ // handle sharding AddPartitionsToTXn, where v4+ switched to batch requests
3309+ type addPartitionsToTxnSharder struct { * Client }
3310+
3311+ func addPartitionsReqToTxn (req * kmsg.AddPartitionsToTxnRequest ) {
3312+ t := kmsg .NewAddPartitionsToTxnRequestTransaction ()
3313+ t .TransactionalID = req .TransactionalID
3314+ t .ProducerID = req .ProducerID
3315+ t .ProducerEpoch = req .ProducerEpoch
3316+ for i := range req .Topics {
3317+ rt := & req .Topics [i ]
3318+ tt := kmsg .NewAddPartitionsToTxnRequestTransactionTopic ()
3319+ tt .Topic = rt .Topic
3320+ tt .Partitions = rt .Partitions
3321+ t .Topics = append (t .Topics , tt )
3322+ }
3323+ req .Transactions = append (req .Transactions , t )
3324+ }
3325+
3326+ func addPartitionsTxnToReq (req * kmsg.AddPartitionsToTxnRequest ) {
3327+ if len (req .Transactions ) != 1 {
3328+ return
3329+ }
3330+ t0 := & req .Transactions [0 ]
3331+ req .TransactionalID = t0 .TransactionalID
3332+ req .ProducerID = t0 .ProducerID
3333+ req .ProducerEpoch = t0 .ProducerEpoch
3334+ for _ , tt := range t0 .Topics {
3335+ rt := kmsg .NewAddPartitionsToTxnRequestTopic ()
3336+ rt .Topic = tt .Topic
3337+ rt .Partitions = tt .Partitions
3338+ req .Topics = append (req .Topics , rt )
3339+ }
3340+ }
3341+
3342+ func addPartitionsTxnToResp (resp * kmsg.AddPartitionsToTxnResponse ) {
3343+ if len (resp .Transactions ) == 0 {
3344+ return
3345+ }
3346+ t0 := & resp .Transactions [0 ]
3347+ for _ , tt := range t0 .Topics {
3348+ rt := kmsg .NewAddPartitionsToTxnResponseTopic ()
3349+ rt .Topic = tt .Topic
3350+ for _ , tp := range tt .Partitions {
3351+ rp := kmsg .NewAddPartitionsToTxnResponseTopicPartition ()
3352+ rp .Partition = tp .Partition
3353+ rp .ErrorCode = tp .ErrorCode
3354+ rt .Partitions = append (rt .Partitions , rp )
3355+ }
3356+ resp .Topics = append (resp .Topics , rt )
3357+ }
3358+ }
3359+
3360+ func (cl * addPartitionsToTxnSharder ) shard (ctx context.Context , kreq kmsg.Request , _ error ) ([]issueShard , bool , error ) {
3361+ req := kreq .(* kmsg.AddPartitionsToTxnRequest )
3362+
3363+ if len (req .Transactions ) == 0 {
3364+ addPartitionsReqToTxn (req )
3365+ }
3366+ txnIDs := make ([]string , 0 , len (req .Transactions ))
3367+ for i := range req .Transactions {
3368+ txnIDs = append (txnIDs , req .Transactions [i ].TransactionalID )
3369+ }
3370+ coordinators := cl .loadCoordinators (ctx , coordinatorTypeTxn , txnIDs ... )
3371+
3372+ type unkerr struct {
3373+ err error
3374+ txn kmsg.AddPartitionsToTxnRequestTransaction
3375+ }
3376+ var (
3377+ brokerReqs = make (map [int32 ]* kmsg.AddPartitionsToTxnRequest )
3378+ kerrs = make (map [* kerr.Error ][]kmsg.AddPartitionsToTxnRequestTransaction )
3379+ unkerrs []unkerr
3380+ )
3381+
3382+ newReq := func (txns ... kmsg.AddPartitionsToTxnRequestTransaction ) * kmsg.AddPartitionsToTxnRequest {
3383+ req := kmsg .NewPtrAddPartitionsToTxnRequest ()
3384+ req .Transactions = txns
3385+ addPartitionsTxnToReq (req )
3386+ return req
3387+ }
3388+
3389+ for _ , txn := range req .Transactions {
3390+ berr := coordinators [txn .TransactionalID ]
3391+ var ke * kerr.Error
3392+ switch {
3393+ case berr .err == nil :
3394+ brokerReq := brokerReqs [berr .b .meta .NodeID ]
3395+ if brokerReq == nil {
3396+ brokerReq = newReq (txn )
3397+ brokerReqs [berr .b .meta .NodeID ] = brokerReq
3398+ } else {
3399+ brokerReq .Transactions = append (brokerReq .Transactions , txn )
3400+ }
3401+ case errors .As (berr .err , & ke ):
3402+ kerrs [ke ] = append (kerrs [ke ], txn )
3403+ default :
3404+ unkerrs = append (unkerrs , unkerr {berr .err , txn })
3405+ }
3406+ }
3407+
3408+ var issues []issueShard
3409+ for id , req := range brokerReqs {
3410+ issues = append (issues , issueShard {
3411+ req : req ,
3412+ broker : id ,
3413+ })
3414+ }
3415+ for _ , unkerr := range unkerrs {
3416+ issues = append (issues , issueShard {
3417+ req : newReq (unkerr .txn ),
3418+ err : unkerr .err ,
3419+ })
3420+ }
3421+ for kerr , txns := range kerrs {
3422+ issues = append (issues , issueShard {
3423+ req : newReq (txns ... ),
3424+ err : kerr ,
3425+ })
3426+ }
3427+
3428+ return issues , true , nil // reshardable to load correct coordinators
3429+ }
3430+
3431+ func (cl * addPartitionsToTxnSharder ) onResp (kreq kmsg.Request , kresp kmsg.Response ) error {
3432+ req := kreq .(* kmsg.AddPartitionsToTxnRequest )
3433+ resp := kresp .(* kmsg.AddPartitionsToTxnResponse )
3434+
3435+ // We default to the top level error, which is used in v4+. For v3
3436+ // (case 0), we use the per-partition error, which is the same for
3437+ // every partition on not_coordinator errors.
3438+ code := resp .ErrorCode
3439+ if code == 0 && len (resp .Transactions ) == 0 {
3440+ // Convert v3 and prior to v4+
3441+ resptxn := kmsg .NewAddPartitionsToTxnResponseTransaction ()
3442+ resptxn .TransactionalID = req .TransactionalID
3443+ for _ , rt := range resp .Topics {
3444+ respt := kmsg .NewAddPartitionsToTxnResponseTransactionTopic ()
3445+ respt .Topic = rt .Topic
3446+ for _ , rp := range rt .Partitions {
3447+ respp := kmsg .NewAddPartitionsToTxnResponseTransactionTopicPartition ()
3448+ respp .Partition = rp .Partition
3449+ respp .ErrorCode = rp .ErrorCode
3450+ code = rp .ErrorCode // v3 and prior has per-partition errors, not top level
3451+ respt .Partitions = append (respt .Partitions , respp )
3452+ }
3453+ resptxn .Topics = append (resptxn .Topics , respt )
3454+ }
3455+ resp .Transactions = append (resp .Transactions , resptxn )
3456+ } else {
3457+ // Convert v4 to v3 and prior: either we have a top level error
3458+ // code or we have at least one transaction.
3459+ //
3460+ // If the code is non-zero, we convert it to per-partition error
3461+ // codes; v3 does not have a top level err.
3462+ addPartitionsTxnToResp (resp )
3463+ if code != 0 {
3464+ for _ , reqt := range req .Topics {
3465+ respt := kmsg .NewAddPartitionsToTxnResponseTopic ()
3466+ respt .Topic = reqt .Topic
3467+ for _ , reqp := range reqt .Partitions {
3468+ respp := kmsg .NewAddPartitionsToTxnResponseTopicPartition ()
3469+ respp .Partition = reqp
3470+ respp .ErrorCode = resp .ErrorCode
3471+ respt .Partitions = append (respt .Partitions , respp )
3472+ }
3473+ resp .Topics = append (resp .Topics , respt )
3474+ }
3475+ }
3476+ }
3477+ if err := kerr .ErrorForCode (code ); cl .maybeDeleteStaleCoordinator (req .TransactionalID , coordinatorTypeTxn , err ) {
3478+ return err
3479+ }
3480+ return nil
3481+ }
3482+
3483+ func (* addPartitionsToTxnSharder ) merge (sresps []ResponseShard ) (kmsg.Response , error ) {
3484+ merged := kmsg .NewPtrAddPartitionsToTxnResponse ()
3485+
3486+ firstErr := firstErrMerger (sresps , func (kresp kmsg.Response ) {
3487+ resp := kresp .(* kmsg.AddPartitionsToTxnResponse )
3488+ merged .Version = resp .Version
3489+ merged .ThrottleMillis = resp .ThrottleMillis
3490+ merged .ErrorCode = resp .ErrorCode
3491+ merged .Transactions = append (merged .Transactions , resp .Transactions ... )
3492+ })
3493+ addPartitionsTxnToResp (merged )
3494+ return merged , firstErr
3495+ }
3496+
32963497// handle sharding WriteTxnMarkersRequest
32973498type writeTxnMarkersSharder struct { * Client }
32983499
0 commit comments