88 "sync"
99 "time"
1010
11- "github.com/twmb/franz-go/pkg/kerr"
1211 "github.com/twmb/franz-go/pkg/kmsg"
12+
13+ "github.com/twmb/franz-go/pkg/kerr"
1314)
1415
1516// TransactionEndTry is simply a named bool.
@@ -1060,7 +1061,13 @@ func (cl *Client) commitTransactionOffsets(
10601061 onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), errNotGroup )
10611062 return nil
10621063 }
1063- if len (uncommitted ) == 0 {
1064+
1065+ req , err := g .prepareTxnOffsetCommit (ctx , uncommitted )
1066+ if err != nil {
1067+ onDone (req , kmsg .NewPtrTxnOffsetCommitResponse (), err )
1068+ return g
1069+ }
1070+ if len (req .Topics ) == 0 {
10641071 onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), nil )
10651072 return g
10661073 }
@@ -1088,7 +1095,7 @@ func (cl *Client) commitTransactionOffsets(
10881095 g .mu .Lock ()
10891096 defer g .mu .Unlock ()
10901097
1091- g .commitTxn (ctx , uncommitted , unblockJoinSync )
1098+ g .commitTxn (ctx , req , unblockJoinSync )
10921099 return g
10931100}
10941101
@@ -1139,18 +1146,10 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
11391146// commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types
11401147// and we avoid updateCommitted. We avoid updating because we manually
11411148// SetOffsets when ending the transaction.
1142- func (g * groupConsumer ) commitTxn (
1143- ctx context.Context ,
1144- uncommitted map [string ]map [int32 ]EpochOffset ,
1145- onDone func (* kmsg.TxnOffsetCommitRequest , * kmsg.TxnOffsetCommitResponse , error ),
1146- ) {
1149+ func (g * groupConsumer ) commitTxn (ctx context.Context , req * kmsg.TxnOffsetCommitRequest , onDone func (* kmsg.TxnOffsetCommitRequest , * kmsg.TxnOffsetCommitResponse , error )) {
11471150 if onDone == nil { // note we must always call onDone
11481151 onDone = func (_ * kmsg.TxnOffsetCommitRequest , _ * kmsg.TxnOffsetCommitResponse , _ error ) {}
11491152 }
1150- if len (uncommitted ) == 0 { // only empty if called thru autocommit / default revoke
1151- onDone (kmsg .NewPtrTxnOffsetCommitRequest (), kmsg .NewPtrTxnOffsetCommitResponse (), nil )
1152- return
1153- }
11541153
11551154 if g .commitCancel != nil {
11561155 g .commitCancel () // cancel any prior commit
@@ -1169,22 +1168,6 @@ func (g *groupConsumer) commitTxn(
11691168 g .commitCancel = commitCancel
11701169 g .commitDone = commitDone
11711170
1172- // We issue this request even if the producer ID is failed; the request
1173- // will fail if it is.
1174- //
1175- // The id must have been set at least once by this point because of
1176- // addOffsetsToTxn.
1177- id , epoch , _ := g .cl .producerID ()
1178- req := kmsg .NewPtrTxnOffsetCommitRequest ()
1179- req .TransactionalID = * g .cl .cfg .txnID
1180- req .Group = g .cfg .group
1181- req .ProducerID = id
1182- req .ProducerEpoch = epoch
1183- memberID , generation := g .memberGen .load ()
1184- req .Generation = generation
1185- req .MemberID = memberID
1186- req .InstanceID = g .cfg .instanceID
1187-
11881171 if ctx .Done () != nil {
11891172 go func () {
11901173 select {
@@ -1207,28 +1190,7 @@ func (g *groupConsumer) commitTxn(
12071190 <- priorDone // wait for any prior request to finish
12081191 }
12091192 }
1210- g .cl .cfg .logger .Log (LogLevelDebug , "issuing txn offset commit" , "uncommitted" , uncommitted )
1211-
1212- for topic , partitions := range uncommitted {
1213- reqTopic := kmsg .NewTxnOffsetCommitRequestTopic ()
1214- reqTopic .Topic = topic
1215- for partition , eo := range partitions {
1216- reqPartition := kmsg .NewTxnOffsetCommitRequestTopicPartition ()
1217- reqPartition .Partition = partition
1218- reqPartition .Offset = eo .Offset
1219- reqPartition .LeaderEpoch = eo .Epoch
1220- reqPartition .Metadata = & req .MemberID
1221- reqTopic .Partitions = append (reqTopic .Partitions , reqPartition )
1222- }
1223- req .Topics = append (req .Topics , reqTopic )
1224- }
1225-
1226- if fn , ok := ctx .Value (txnCommitContextFn ).(func (* kmsg.TxnOffsetCommitRequest ) error ); ok {
1227- if err := fn (req ); err != nil {
1228- onDone (req , nil , err )
1229- return
1230- }
1231- }
1193+ g .cl .cfg .logger .Log (LogLevelDebug , "issuing txn offset commit" , "uncommitted" , req )
12321194
12331195 var resp * kmsg.TxnOffsetCommitResponse
12341196 var err error
@@ -1242,3 +1204,44 @@ func (g *groupConsumer) commitTxn(
12421204 onDone (req , resp , nil )
12431205 }()
12441206}
1207+
1208+ func (g * groupConsumer ) prepareTxnOffsetCommit (ctx context.Context , uncommitted map [string ]map [int32 ]EpochOffset ) (* kmsg.TxnOffsetCommitRequest , error ) {
1209+ req := kmsg .NewPtrTxnOffsetCommitRequest ()
1210+
1211+ // We're now generating the producerID before addOffsetsToTxn.
1212+ // We will not make this request until after addOffsetsToTxn, but it's possible to fail here due to a failed producerID.
1213+ id , epoch , err := g .cl .producerID ()
1214+ if err != nil {
1215+ return req , err
1216+ }
1217+
1218+ req .TransactionalID = * g .cl .cfg .txnID
1219+ req .Group = g .cfg .group
1220+ req .ProducerID = id
1221+ req .ProducerEpoch = epoch
1222+ memberID , generation := g .memberGen .load ()
1223+ req .Generation = generation
1224+ req .MemberID = memberID
1225+ req .InstanceID = g .cfg .instanceID
1226+
1227+ for topic , partitions := range uncommitted {
1228+ reqTopic := kmsg .NewTxnOffsetCommitRequestTopic ()
1229+ reqTopic .Topic = topic
1230+ for partition , eo := range partitions {
1231+ reqPartition := kmsg .NewTxnOffsetCommitRequestTopicPartition ()
1232+ reqPartition .Partition = partition
1233+ reqPartition .Offset = eo .Offset
1234+ reqPartition .LeaderEpoch = eo .Epoch
1235+ reqPartition .Metadata = & req .MemberID
1236+ reqTopic .Partitions = append (reqTopic .Partitions , reqPartition )
1237+ }
1238+ req .Topics = append (req .Topics , reqTopic )
1239+ }
1240+
1241+ if fn , ok := ctx .Value (txnCommitContextFn ).(func (* kmsg.TxnOffsetCommitRequest ) error ); ok {
1242+ if err := fn (req ); err != nil {
1243+ return req , err
1244+ }
1245+ }
1246+ return req , nil
1247+ }
0 commit comments