Skip to content

Commit fe5a660

Browse files
committed
kgo: add sharding for AddPartitionsToTxn for KIP-890
This is more of a forward looking commit, in that kadm will eventually introduce support for this. We now basically handle v4 properly, even though KIP-890 dictates that v4 isn't meant to be sent by clients, it is indeed still necessary and not sending it results in INVALID_TXN_STATE errors. Also properly adds the WriteTxnMarkers sharder to the switch, though nothing really should send that request so that doesn't really fix any bugs. Kafka 3.6 has an NPE handling produce requests frequently, see KAFKA-15653, so tests may fail against 3.6 occasionally.
1 parent 3273585 commit fe5a660

File tree

1 file changed

+210
-9
lines changed

1 file changed

+210
-9
lines changed

pkg/kgo/client.go

Lines changed: 210 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
13251325
*kmsg.ListGroupsRequest, // key 16
13261326
*kmsg.DeleteRecordsRequest, // key 21
13271327
*kmsg.OffsetForLeaderEpochRequest, // key 23
1328+
*kmsg.AddPartitionsToTxnRequest, // key 24
1329+
*kmsg.WriteTxnMarkersRequest, // key 27
13281330
*kmsg.DescribeConfigsRequest, // key 32
13291331
*kmsg.AlterConfigsRequest, // key 33
13301332
*kmsg.AlterReplicaLogDirsRequest, // key 34
@@ -1775,8 +1777,6 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
17751777
// names, we delete no coordinator.
17761778
coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { return cl.broker(), nil }, coordinatorTypeTxn, "", req)
17771779
return shard(coordinator, req, resp, err)
1778-
case *kmsg.AddPartitionsToTxnRequest:
1779-
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req)
17801780
case *kmsg.AddOffsetsToTxnRequest:
17811781
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req)
17821782
case *kmsg.EndTxnRequest:
@@ -1840,10 +1840,6 @@ func (cl *Client) handleReqWithCoordinator(
18401840
// TXN
18411841
case *kmsg.InitProducerIDResponse:
18421842
code = t.ErrorCode
1843-
case *kmsg.AddPartitionsToTxnResponse:
1844-
if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 {
1845-
code = t.Topics[0].Partitions[0].ErrorCode
1846-
}
18471843
case *kmsg.AddOffsetsToTxnResponse:
18481844
code = t.ErrorCode
18491845
case *kmsg.EndTxnResponse:
@@ -2080,6 +2076,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
20802076
sharder = &deleteRecordsSharder{cl}
20812077
case *kmsg.OffsetForLeaderEpochRequest:
20822078
sharder = &offsetForLeaderEpochSharder{cl}
2079+
case *kmsg.AddPartitionsToTxnRequest:
2080+
sharder = &addPartitionsToTxnSharder{cl}
20832081
case *kmsg.WriteTxnMarkersRequest:
20842082
sharder = &writeTxnMarkersSharder{cl}
20852083
case *kmsg.DescribeConfigsRequest:
@@ -2767,9 +2765,16 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
27672765
broker: id,
27682766
})
27692767
}
2768+
} else if len(req.Groups) == 1 {
2769+
single := offsetFetchGroupToReq(req.RequireStable, req.Groups[0])
2770+
single.Groups = req.Groups
2771+
issues = append(issues, issueShard{
2772+
req: single,
2773+
broker: id,
2774+
})
27702775
} else {
27712776
issues = append(issues, issueShard{
2772-
req: &pinReq{Request: req, pinMin: true, min: 8},
2777+
req: &pinReq{Request: req, pinMin: len(req.Groups) > 1, min: 8},
27732778
broker: id,
27742779
})
27752780
}
@@ -2791,7 +2796,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
27912796
}
27922797

27932798
func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
2794-
req := kreq.(*kmsg.OffsetFetchRequest) // we always issue pinned requests
2799+
req := kreq.(*kmsg.OffsetFetchRequest)
27952800
resp := kresp.(*kmsg.OffsetFetchResponse)
27962801

27972802
switch len(resp.Groups) {
@@ -2876,9 +2881,16 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
28762881
for key := range uniq {
28772882
req.CoordinatorKeys = append(req.CoordinatorKeys, key)
28782883
}
2884+
if len(req.CoordinatorKeys) == 1 {
2885+
req.CoordinatorKey = req.CoordinatorKeys[0]
2886+
}
28792887

28802888
splitReq := errors.Is(lastErr, errBrokerTooOld)
28812889
if !splitReq {
2890+
// With only one key, we do not need to split nor pin this.
2891+
if len(req.CoordinatorKeys) <= 1 {
2892+
return []issueShard{{req: req, any: true}}, false, nil
2893+
}
28822894
return []issueShard{{
28832895
req: &pinReq{Request: req, pinMin: true, min: 4},
28842896
any: true,
@@ -2899,7 +2911,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
28992911
}
29002912

29012913
func (*findCoordinatorSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
2902-
req := kreq.(*kmsg.FindCoordinatorRequest) // we always issue pinned requests
2914+
req := kreq.(*kmsg.FindCoordinatorRequest)
29032915
resp := kresp.(*kmsg.FindCoordinatorResponse)
29042916

29052917
switch len(resp.Coordinators) {
@@ -3293,6 +3305,195 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
32933305
return merged, firstErr
32943306
}
32953307

3308+
// handle sharding AddPartitionsToTXn, where v4+ switched to batch requests
3309+
type addPartitionsToTxnSharder struct{ *Client }
3310+
3311+
func addPartitionsReqToTxn(req *kmsg.AddPartitionsToTxnRequest) {
3312+
t := kmsg.NewAddPartitionsToTxnRequestTransaction()
3313+
t.TransactionalID = req.TransactionalID
3314+
t.ProducerID = req.ProducerID
3315+
t.ProducerEpoch = req.ProducerEpoch
3316+
for i := range req.Topics {
3317+
rt := &req.Topics[i]
3318+
tt := kmsg.NewAddPartitionsToTxnRequestTransactionTopic()
3319+
tt.Topic = rt.Topic
3320+
tt.Partitions = rt.Partitions
3321+
t.Topics = append(t.Topics, tt)
3322+
}
3323+
req.Transactions = append(req.Transactions, t)
3324+
}
3325+
3326+
func addPartitionsTxnToReq(req *kmsg.AddPartitionsToTxnRequest) {
3327+
if len(req.Transactions) != 1 {
3328+
return
3329+
}
3330+
t0 := &req.Transactions[0]
3331+
req.TransactionalID = t0.TransactionalID
3332+
req.ProducerID = t0.ProducerID
3333+
req.ProducerEpoch = t0.ProducerEpoch
3334+
for _, tt := range t0.Topics {
3335+
rt := kmsg.NewAddPartitionsToTxnRequestTopic()
3336+
rt.Topic = tt.Topic
3337+
rt.Partitions = tt.Partitions
3338+
req.Topics = append(req.Topics, rt)
3339+
}
3340+
}
3341+
3342+
func addPartitionsTxnToResp(resp *kmsg.AddPartitionsToTxnResponse) {
3343+
if len(resp.Transactions) == 0 {
3344+
return
3345+
}
3346+
t0 := &resp.Transactions[0]
3347+
for _, tt := range t0.Topics {
3348+
rt := kmsg.NewAddPartitionsToTxnResponseTopic()
3349+
rt.Topic = tt.Topic
3350+
for _, tp := range tt.Partitions {
3351+
rp := kmsg.NewAddPartitionsToTxnResponseTopicPartition()
3352+
rp.Partition = tp.Partition
3353+
rp.ErrorCode = tp.ErrorCode
3354+
rt.Partitions = append(rt.Partitions, rp)
3355+
}
3356+
resp.Topics = append(resp.Topics, rt)
3357+
}
3358+
}
3359+
3360+
func (cl *addPartitionsToTxnSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
3361+
req := kreq.(*kmsg.AddPartitionsToTxnRequest)
3362+
3363+
if len(req.Transactions) == 0 {
3364+
addPartitionsReqToTxn(req)
3365+
}
3366+
txnIDs := make([]string, 0, len(req.Transactions))
3367+
for i := range req.Transactions {
3368+
txnIDs = append(txnIDs, req.Transactions[i].TransactionalID)
3369+
}
3370+
coordinators := cl.loadCoordinators(ctx, coordinatorTypeTxn, txnIDs...)
3371+
3372+
type unkerr struct {
3373+
err error
3374+
txn kmsg.AddPartitionsToTxnRequestTransaction
3375+
}
3376+
var (
3377+
brokerReqs = make(map[int32]*kmsg.AddPartitionsToTxnRequest)
3378+
kerrs = make(map[*kerr.Error][]kmsg.AddPartitionsToTxnRequestTransaction)
3379+
unkerrs []unkerr
3380+
)
3381+
3382+
newReq := func(txns ...kmsg.AddPartitionsToTxnRequestTransaction) *kmsg.AddPartitionsToTxnRequest {
3383+
req := kmsg.NewPtrAddPartitionsToTxnRequest()
3384+
req.Transactions = txns
3385+
addPartitionsTxnToReq(req)
3386+
return req
3387+
}
3388+
3389+
for _, txn := range req.Transactions {
3390+
berr := coordinators[txn.TransactionalID]
3391+
var ke *kerr.Error
3392+
switch {
3393+
case berr.err == nil:
3394+
brokerReq := brokerReqs[berr.b.meta.NodeID]
3395+
if brokerReq == nil {
3396+
brokerReq = newReq(txn)
3397+
brokerReqs[berr.b.meta.NodeID] = brokerReq
3398+
} else {
3399+
brokerReq.Transactions = append(brokerReq.Transactions, txn)
3400+
}
3401+
case errors.As(berr.err, &ke):
3402+
kerrs[ke] = append(kerrs[ke], txn)
3403+
default:
3404+
unkerrs = append(unkerrs, unkerr{berr.err, txn})
3405+
}
3406+
}
3407+
3408+
var issues []issueShard
3409+
for id, req := range brokerReqs {
3410+
issues = append(issues, issueShard{
3411+
req: req,
3412+
broker: id,
3413+
})
3414+
}
3415+
for _, unkerr := range unkerrs {
3416+
issues = append(issues, issueShard{
3417+
req: newReq(unkerr.txn),
3418+
err: unkerr.err,
3419+
})
3420+
}
3421+
for kerr, txns := range kerrs {
3422+
issues = append(issues, issueShard{
3423+
req: newReq(txns...),
3424+
err: kerr,
3425+
})
3426+
}
3427+
3428+
return issues, true, nil // reshardable to load correct coordinators
3429+
}
3430+
3431+
func (cl *addPartitionsToTxnSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
3432+
req := kreq.(*kmsg.AddPartitionsToTxnRequest)
3433+
resp := kresp.(*kmsg.AddPartitionsToTxnResponse)
3434+
3435+
// We default to the top level error, which is used in v4+. For v3
3436+
// (case 0), we use the per-partition error, which is the same for
3437+
// every partition on not_coordinator errors.
3438+
code := resp.ErrorCode
3439+
if code == 0 && len(resp.Transactions) == 0 {
3440+
// Convert v3 and prior to v4+
3441+
resptxn := kmsg.NewAddPartitionsToTxnResponseTransaction()
3442+
resptxn.TransactionalID = req.TransactionalID
3443+
for _, rt := range resp.Topics {
3444+
respt := kmsg.NewAddPartitionsToTxnResponseTransactionTopic()
3445+
respt.Topic = rt.Topic
3446+
for _, rp := range rt.Partitions {
3447+
respp := kmsg.NewAddPartitionsToTxnResponseTransactionTopicPartition()
3448+
respp.Partition = rp.Partition
3449+
respp.ErrorCode = rp.ErrorCode
3450+
code = rp.ErrorCode // v3 and prior has per-partition errors, not top level
3451+
respt.Partitions = append(respt.Partitions, respp)
3452+
}
3453+
resptxn.Topics = append(resptxn.Topics, respt)
3454+
}
3455+
resp.Transactions = append(resp.Transactions, resptxn)
3456+
} else {
3457+
// Convert v4 to v3 and prior: either we have a top level error
3458+
// code or we have at least one transaction.
3459+
//
3460+
// If the code is non-zero, we convert it to per-partition error
3461+
// codes; v3 does not have a top level err.
3462+
addPartitionsTxnToResp(resp)
3463+
if code != 0 {
3464+
for _, reqt := range req.Topics {
3465+
respt := kmsg.NewAddPartitionsToTxnResponseTopic()
3466+
respt.Topic = reqt.Topic
3467+
for _, reqp := range reqt.Partitions {
3468+
respp := kmsg.NewAddPartitionsToTxnResponseTopicPartition()
3469+
respp.Partition = reqp
3470+
respp.ErrorCode = resp.ErrorCode
3471+
respt.Partitions = append(respt.Partitions, respp)
3472+
}
3473+
resp.Topics = append(resp.Topics, respt)
3474+
}
3475+
}
3476+
}
3477+
if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(req.TransactionalID, coordinatorTypeTxn, err) {
3478+
return err
3479+
}
3480+
return nil
3481+
}
3482+
3483+
func (*addPartitionsToTxnSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
3484+
merged := kmsg.NewPtrAddPartitionsToTxnResponse()
3485+
3486+
firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) {
3487+
resp := kresp.(*kmsg.AddPartitionsToTxnResponse)
3488+
merged.Version = resp.Version
3489+
merged.ThrottleMillis = resp.ThrottleMillis
3490+
merged.ErrorCode = resp.ErrorCode
3491+
merged.Transactions = append(merged.Transactions, resp.Transactions...)
3492+
})
3493+
addPartitionsTxnToResp(merged)
3494+
return merged, firstErr
3495+
}
3496+
32963497
// handle sharding WriteTxnMarkersRequest
32973498
type writeTxnMarkersSharder struct{ *Client }
32983499

0 commit comments

Comments
 (0)