Skip to content

Commit 910e91d

Browse files
committed
kgo: add PreCommitTxnFnContext
Allows modifying TxnOffsetCommitRequest before it is issued -- particularly, to modify the metadata field. Closes #559.
1 parent 5f5c721 commit 910e91d

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

pkg/kgo/consumer_group.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2238,6 +2238,20 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
22382238
return context.WithValue(ctx, commitContextFn, fn)
22392239
}
22402240

2241+
type commitTxnContextFnT struct{}
2242+
2243+
var commitTxnContextFn commitTxnContextFnT
2244+
2245+
// PreCommitTxnFnContext attaches fn to the context through WithValue. Using
2246+
// the context while committing a transaction allows fn to be called just
2247+
// before the commit is issued. This can be used to modify the actual commit,
2248+
// such as by associating metadata with partitions (for transactions, the
2249+
// default internal metadata is the client's current member ID). If fn returns
2250+
// an error, the commit is not attempted.
2251+
func PreCommitTxnFnContext(ctx context.Context, fn func(*kmsg.TxnOffsetCommitRequest) error) context.Context {
2252+
return context.WithValue(ctx, commitTxnContextFn, fn)
2253+
}
2254+
22412255
// CommitRecords issues a synchronous offset commit for the offsets contained
22422256
// within rs. Retryable errors are retried up to the configured retry limit,
22432257
// and any unretryable error is returned.

pkg/kgo/txn.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,13 @@ func (g *groupConsumer) commitTxn(
12221222
req.Topics = append(req.Topics, reqTopic)
12231223
}
12241224

1225+
if fn, ok := ctx.Value(commitTxnContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
1226+
if err := fn(req); err != nil {
1227+
onDone(req, nil, err)
1228+
return
1229+
}
1230+
}
1231+
12251232
var resp *kmsg.TxnOffsetCommitResponse
12261233
var err error
12271234
if len(req.Topics) > 0 {

0 commit comments

Comments
 (0)