Skip to content

Commit 60b601a

Browse files
committed
kgo / changelog: last minute rename from CommitTxn to TxnCommit
This better mirrors TxnOffsetCommit, also in the same api call, and better mirrors how it's pretty universally Txn then Commit wherever you see it. Also fixes the context in GroupTransactSession.End to actually be passed all the way to the commit function, and documents where it can be used.
1 parent 036f599 commit 60b601a

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

pkg/kgo/consumer_group.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2289,18 +2289,19 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
22892289
return context.WithValue(ctx, commitContextFn, fn)
22902290
}
22912291

2292-
type commitTxnContextFnT struct{}
2292+
type txnCommitContextFnT struct{}
22932293

2294-
var commitTxnContextFn commitTxnContextFnT
2294+
var txnCommitContextFn txnCommitContextFnT
22952295

2296-
// PreCommitTxnFnContext attaches fn to the context through WithValue. Using
2296+
// PreTxnCommitFnContext attaches fn to the context through WithValue. Using
22972297
// the context while committing a transaction allows fn to be called just
22982298
// before the commit is issued. This can be used to modify the actual commit,
22992299
// such as by associating metadata with partitions (for transactions, the
23002300
// default internal metadata is the client's current member ID). If fn returns
2301-
// an error, the commit is not attempted.
2302-
func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
2303-
return context.WithValue(ctx, commitTxnContextFn, fn)
2301+
// an error, the commit is not attempted. This context can be used in either
2302+
// GroupTransactSession.End or in Client.EndTransaction.
2303+
func PreTxnCommitFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
2304+
return context.WithValue(ctx, txnCommitContextFn, fn)
23042305
}
23052306

23062307
// CommitRecords issues a synchronous offset commit for the offsets contained

pkg/kgo/txn.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
287287
var commitErrs []string
288288

289289
committed := make(chan struct{})
290-
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
290+
g = s.cl.commitTransactionOffsets(ctx, postcommit,
291291
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
292292
defer close(committed)
293293
if err != nil {
@@ -1222,7 +1222,7 @@ func (g *groupConsumer) commitTxn(
12221222
req.Topics = append(req.Topics, reqTopic)
12231223
}
12241224

1225-
if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
1225+
if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
12261226
if err := fn(req); err != nil {
12271227
onDone(req, nil, err)
12281228
return

0 commit comments

Comments
 (0)