Skip to content

Commit eb6e3b5

Browse files
committed
kgo sasl reauth: be more pessimistic
AWS is saying that we have 12hr of auth lifetime, and then throwing authorization errors at ~11hr50min. We will be more pessimistic and use only 95 to 98% of the lifetime. This is similar to the Java client, which uses 85 to 95%. Closes #205.
1 parent 47eccba commit eb6e3b5

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

pkg/kgo/broker.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -877,33 +877,59 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
877877
}
878878

879879
if lifetimeMillis > 0 {
880-
// Lifetime: we could have written our request instantaenously,
881-
// the broker calculating our session lifetime, and then the
882-
// broker / network hung for a bit when writing. We
883-
// pessimistically assume this worst case and take off the
884-
// final request e2e latency x1.1 from the lifetime.
880+
// Lifetime is problematic. We need to be a bit pessimistic.
885881
//
886-
// If the latency is <2.5s, we also pessimistically assume that
887-
// things may take 2.5s in the future.
882+
// We want a lowerbound: we use 2s (arbitrary), but if 1.1x our
883+
// e2e sasl latency is more than 2s, we use the latency.
888884
//
889-
// We may make our lifetime <0; brokers should use longer
890-
// lifetimes, but some do not in all cases. If our lifetime is
891-
// <100ms, we sleep for 100ms just to ensure we do not
892-
// spin-loop reauthenticating *too* much.
893-
latency := int64(float64(time.Since(prereq).Milliseconds()) * 1.1)
894-
if latency < 2500 {
895-
latency = 2500
885+
// We do not want to reauthenticate too close to the lifetime
886+
// especially for larger lifetimes due to clock issues (#205).
887+
// We take 95% to 98% of the lifetime.
888+
minPessimismMillis := float64(2 * time.Second.Milliseconds())
889+
latencyMillis := 1.1 * float64(time.Since(prereq).Milliseconds())
890+
if latencyMillis > minPessimismMillis {
891+
minPessimismMillis = latencyMillis
896892
}
893+
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
897894

898-
useLifetime := lifetimeMillis - latency
895+
// Our minimum lifetime is always 2s (or latency, if larger).
896+
//
897+
// If rng is 0, we begin using max lifetime at 40s:
898+
//
899+
// maxLifetime = 40s - (40s * 0.05) = 38s
900+
// minLifetime = 40s - 2s = 38s
901+
//
902+
// If rng is 1, we begin using max lifetime at 25s:
903+
//
904+
// maxLifetime = 25s - (25s * 0.08) = 23s
905+
// minLifetime = 25s - 2s = 23s
906+
//
907+
// Every second after, we add between 0.05s or 0.08s to our
908+
// backoff:
909+
//
910+
// rng@0: maxLifetime = 41s - (41s * 0.05) = 38.95
911+
// rng@1: maxLifetime = 26s - (26s * 0.08) = 23.92
912+
//
913+
// At 12hr, we reauth ~24 to 28min before the lifetime.
914+
usePessimismMillis := maxPessimismMillis
915+
if minPessimismMillis > maxPessimismMillis {
916+
usePessimismMillis = minPessimismMillis
917+
}
918+
useLifetimeMillis := lifetimeMillis - int64(usePessimismMillis)
919+
920+
// If our lifetime is <0 (broker said our lifetime is less than
921+
// our client picked min), we sleep for 100ms and retry.
922+
// Brokers should give us longer lifetimes, but that may not
923+
// always happen (see #136). We sleep to avoid spin loop
924+
// reauthenticating.
899925
now := time.Now()
900-
cxn.expiry = now.Add(time.Duration(useLifetime) * time.Millisecond)
926+
cxn.expiry = now.Add(time.Duration(useLifetimeMillis) * time.Millisecond)
901927
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now))
902-
if useLifetime < 0 {
903-
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
928+
if useLifetimeMillis < 0 {
929+
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
904930
"broker", logID(cxn.b.meta.NodeID),
905931
"session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond,
906-
"latency_lower_bound", time.Duration(latency)*time.Millisecond,
932+
"latency_lower_bound", time.Duration(latencyMillis)*time.Millisecond,
907933
)
908934
time.Sleep(100 * time.Millisecond)
909935
}

pkg/kgo/client.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Client struct {
3636
ctx context.Context
3737
ctxCancel func()
3838

39-
rng *rand.Rand
39+
rng func() float64
4040

4141
brokersMu sync.RWMutex
4242
brokers []*broker // ordered by broker ID
@@ -201,7 +201,16 @@ func NewClient(opts ...Opt) (*Client, error) {
201201
cfg: cfg,
202202
ctx: ctx,
203203
ctxCancel: cancel,
204-
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
204+
205+
rng: func() func() float64 {
206+
var mu sync.Mutex
207+
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
208+
return func() float64 {
209+
mu.Lock()
210+
defer mu.Unlock()
211+
return rng.Float64()
212+
}
213+
}(),
205214

206215
controllerID: unknownControllerID,
207216

0 commit comments

Comments
 (0)