Skip to content

Commit 93d7919

Browse files
committed
fix(sse): cancel active streams on user logout
Fixes #6029. SSE streaming endpoints (streamClusters in sse.go) ran inside SetBodyStreamWriter callbacks that blocked for up to sseOverallDeadline (~30s), so when a user logged out any in-flight SSE streams continued to emit cluster_data events until the deadline fired or the client disconnected. Mirrors the exec session cancellation pattern added in #6031 for /ws/exec sessions (#6024): - Add a per-user SSE session registry in sse.go with registerSSESession / unregisterSSESession / CancelUserSSEStreams, guarded by a plain sync.Mutex. - In streamClusters, capture the authenticated user ID before the deferred SetBodyStreamWriter callback runs (the fiber.Ctx may be reused by then) and register the stream context's cancel func in the registry, with a deferred unregister for normal stream end. - In auth.Logout, after CancelUserExecSessions, call CancelUserSSEStreams(claims.UserID) so logout tears down any live SSE streams the user had open. - Add sse_test.go with the four lifecycle tests mirroring exec_test.go: CancelsRegisteredContexts, OtherUserUnaffected, UnregisterSSESession_RemovesEntry, and NoSessions (no panic on empty map). streamDemoSSE is left unchanged — demo streams are instant, they do not block, so they need no cancellation path. Signed-off-by: Andrew Anderson <[email protected]>
1 parent b8524c0 commit 93d7919

3 files changed

Lines changed: 238 additions & 0 deletions

File tree

pkg/api/handlers/auth.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,13 @@ func (h *AuthHandler) Logout(c *fiber.Ctx) error {
587587
// unblocks the stream and tears the connection down promptly.
588588
if claims.UserID != uuid.Nil {
589589
CancelUserExecSessions(claims.UserID)
590+
// Cancel any active SSE streams for this user (#6029). SSE streams
591+
// run inside SetBodyStreamWriter callbacks that block for up to
592+
// sseOverallDeadline (~30s); without this, a logged-out user would
593+
// continue to receive cluster_data events until the deadline fires.
594+
// streamClusters registers each stream's cancel func in a per-user
595+
// registry on start; cancelling those funcs here ends the stream.
596+
CancelUserSSEStreams(claims.UserID)
590597
}
591598

592599
slog.Info("[Auth] token revoked, WS sessions closed", "user", claims.GitHubLogin, "jti", claims.ID)

pkg/api/handlers/sse.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"time"
1212

1313
"github.com/gofiber/fiber/v2"
14+
"github.com/google/uuid"
15+
"github.com/kubestellar/console/pkg/api/middleware"
1416
"github.com/kubestellar/console/pkg/k8s"
1517
)
1618

@@ -91,6 +93,91 @@ const sseCacheTTL = 15 * time.Second
9193
// to remove expired entries and prevent unbounded memory growth.
9294
const sseCacheEvictInterval = 30 * time.Second
9395

96+
// sseSessionRegistry tracks active SSE streams per user so that
97+
// CancelUserSSEStreams can tear them down on logout (#6029).
98+
//
99+
// SSE streams run inside c.Context().SetBodyStreamWriter callbacks that block
100+
// until either the client disconnects or sseOverallDeadline fires. Without a
101+
// per-user registry, a logged-out user's in-flight streams continue emitting
102+
// "cluster_data" events for up to ~30s because nothing actively cancels the
103+
// stream context. This registry mirrors the exec session registry in exec.go:
104+
// when a stream's context is created, its cancel func is recorded keyed by
105+
// userID; on logout, CancelUserSSEStreams runs every recorded cancel for that
106+
// user, which causes the SetBodyStreamWriter callback to exit promptly.
107+
//
108+
// A regular sync.Mutex is used (not RWMutex) because writes (add/remove on
109+
// stream start/end) and reads (CancelUserSSEStreams on logout) are both
110+
// infrequent and always short; an RWMutex would add complexity for no gain.
111+
var (
112+
sseSessionsMu sync.Mutex
113+
sseSessions = make(map[uuid.UUID]map[int64]context.CancelFunc)
114+
sseSessionSeq int64 // monotonic id generator, guarded by sseSessionsMu
115+
)
116+
117+
// registerSSESession records cancel under userID and returns the assigned
118+
// session id. The session id is used by unregisterSSESession to remove the
119+
// specific entry when the stream ends normally, so the map does not grow
120+
// unbounded across many streams by the same user.
121+
func registerSSESession(userID uuid.UUID, cancel context.CancelFunc) int64 {
122+
sseSessionsMu.Lock()
123+
defer sseSessionsMu.Unlock()
124+
sseSessionSeq++
125+
id := sseSessionSeq
126+
sessions, ok := sseSessions[userID]
127+
if !ok {
128+
sessions = make(map[int64]context.CancelFunc)
129+
sseSessions[userID] = sessions
130+
}
131+
sessions[id] = cancel
132+
return id
133+
}
134+
135+
// unregisterSSESession removes a single stream entry. Called from the SSE
136+
// handler's deferred cleanup on normal stream end so the registry stays
137+
// bounded by the number of concurrently live streams, not the total lifetime
138+
// count.
139+
func unregisterSSESession(userID uuid.UUID, id int64) {
140+
sseSessionsMu.Lock()
141+
defer sseSessionsMu.Unlock()
142+
sessions, ok := sseSessions[userID]
143+
if !ok {
144+
return
145+
}
146+
delete(sessions, id)
147+
if len(sessions) == 0 {
148+
delete(sseSessions, userID)
149+
}
150+
}
151+
152+
// CancelUserSSEStreams cancels every active SSE stream belonging to the given
153+
// user and clears the entries from the registry. Called from the auth Logout
154+
// handler after revoking the JWT so that any streaming endpoint the user had
155+
// open stops emitting events promptly (#6029). Safe to call with a userID
156+
// that has no live streams.
157+
func CancelUserSSEStreams(userID uuid.UUID) {
158+
sseSessionsMu.Lock()
159+
sessions, ok := sseSessions[userID]
160+
if !ok {
161+
sseSessionsMu.Unlock()
162+
return
163+
}
164+
// Take ownership of the cancel funcs under the lock, then release the
165+
// lock before invoking them. Calling cancel() itself is cheap but the
166+
// goroutines it unblocks may contend for other locks; holding
167+
// sseSessionsMu across those is unnecessary and risks deadlock.
168+
cancels := make([]context.CancelFunc, 0, len(sessions))
169+
for _, c := range sessions {
170+
cancels = append(cancels, c)
171+
}
172+
delete(sseSessions, userID)
173+
sseSessionsMu.Unlock()
174+
175+
for _, cancel := range cancels {
176+
cancel()
177+
}
178+
slog.Info("[SSE] cancelled SSE streams for user", "user", userID, "count", len(cancels))
179+
}
180+
94181
// SSE response cache — avoids re-fetching when the user navigates away and back.
95182
var (
96183
sseCache = map[string]*sseCacheEntry{}
@@ -207,6 +294,11 @@ func streamClusters(
207294
offline = filteredOffline
208295
}
209296

297+
// Capture the authenticated user ID before entering the deferred
298+
// SetBodyStreamWriter callback. The fiber.Ctx may be reused by the time
299+
// the callback runs, so c.Locals is not safe to read inside it (#6029).
300+
userID := middleware.GetUserID(c)
301+
210302
c.Set("Content-Type", "text/event-stream")
211303
c.Set("Cache-Control", "no-cache")
212304
c.Set("Connection", "keep-alive")
@@ -220,6 +312,16 @@ func streamClusters(
220312
streamCtx, streamCancel := context.WithTimeout(context.Background(), sseOverallDeadline)
221313
defer streamCancel()
222314

315+
// Register this stream's cancel with the per-user SSE session
316+
// registry so a later Logout call can tear the stream down promptly
317+
// instead of waiting for sseOverallDeadline (#6029). Only register
318+
// when we have a real userID — in dev/demo without a valid UserID
319+
// claim there is nothing to key on.
320+
if userID != uuid.Nil {
321+
sessionID := registerSSESession(userID, streamCancel)
322+
defer unregisterSSESession(userID, sessionID)
323+
}
324+
223325
var mu sync.Mutex
224326
totalClusters := len(healthy) + len(offline)
225327
completedClusters := 0

pkg/api/handlers/sse_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package handlers
22

33
import (
4+
"context"
45
"errors"
56
"io"
67
"net/http"
78
"strings"
9+
"sync/atomic"
810
"testing"
11+
"time"
912

13+
"github.com/google/uuid"
1014
"github.com/stretchr/testify/assert"
1115
"github.com/stretchr/testify/require"
1216
corev1 "k8s.io/api/core/v1"
@@ -174,3 +178,128 @@ func TestStreamClusters_EmitsClusterErrorOnFailure(t *testing.T) {
174178
assert.True(t, strings.Contains(body, "cluster_data"))
175179
assert.True(t, strings.Contains(body, "done"))
176180
}
181+
182+
// testSSECancelWait is how long tests wait for CancelUserSSEStreams to
183+
// propagate through the registered cancel funcs before asserting the context
184+
// is Done. The registry just calls cancel() synchronously, so in practice a
185+
// few milliseconds is enough; we use a generous budget to keep CI flake-free.
186+
const testSSECancelWait = 250 * time.Millisecond
187+
188+
// TestCancelUserSSEStreams_CancelsRegisteredContexts verifies the core
189+
// lifecycle invariant for #6029: a stream context registered for a user is
190+
// Done() after CancelUserSSEStreams runs for that user, and the registry no
191+
// longer holds a reference to it.
192+
func TestCancelUserSSEStreams_CancelsRegisteredContexts(t *testing.T) {
193+
userID := uuid.New()
194+
ctxA, cancelA := context.WithCancel(context.Background())
195+
t.Cleanup(cancelA)
196+
ctxB, cancelB := context.WithCancel(context.Background())
197+
t.Cleanup(cancelB)
198+
199+
idA := registerSSESession(userID, cancelA)
200+
idB := registerSSESession(userID, cancelB)
201+
require.NotEqual(t, idA, idB, "session ids must be unique within a user")
202+
203+
CancelUserSSEStreams(userID)
204+
205+
// Both contexts should see cancellation essentially immediately.
206+
select {
207+
case <-ctxA.Done():
208+
case <-time.After(testSSECancelWait):
209+
t.Fatalf("stream A context was not cancelled within %s", testSSECancelWait)
210+
}
211+
select {
212+
case <-ctxB.Done():
213+
case <-time.After(testSSECancelWait):
214+
t.Fatalf("stream B context was not cancelled within %s", testSSECancelWait)
215+
}
216+
217+
// The registry entry should be gone so it can't leak across users/logouts.
218+
sseSessionsMu.Lock()
219+
_, stillThere := sseSessions[userID]
220+
sseSessionsMu.Unlock()
221+
assert.False(t, stillThere, "registry entry for user should be cleared after cancellation")
222+
}
223+
224+
// TestCancelUserSSEStreams_OtherUserUnaffected confirms that cancelling one
225+
// user's streams does not touch another user's streams. This is important
226+
// because logout of user A must not drop user B's live SSE subscriptions.
227+
func TestCancelUserSSEStreams_OtherUserUnaffected(t *testing.T) {
228+
userA := uuid.New()
229+
userB := uuid.New()
230+
231+
ctxA, cancelA := context.WithCancel(context.Background())
232+
t.Cleanup(cancelA)
233+
ctxB, cancelB := context.WithCancel(context.Background())
234+
t.Cleanup(cancelB)
235+
236+
registerSSESession(userA, cancelA)
237+
idB := registerSSESession(userB, cancelB)
238+
t.Cleanup(func() { unregisterSSESession(userB, idB) })
239+
240+
CancelUserSSEStreams(userA)
241+
242+
// userA's context should be cancelled.
243+
select {
244+
case <-ctxA.Done():
245+
case <-time.After(testSSECancelWait):
246+
t.Fatalf("userA context was not cancelled")
247+
}
248+
249+
// userB's context must still be alive — a different user logging out
250+
// must not tear down unrelated SSE streams.
251+
select {
252+
case <-ctxB.Done():
253+
t.Fatal("userB context was cancelled despite only userA logging out")
254+
case <-time.After(testSSECancelWait / 2):
255+
// expected: context still alive
256+
}
257+
}
258+
259+
// TestUnregisterSSESession_RemovesEntry verifies the deferred cleanup path
260+
// for normal stream end: unregisterSSESession should drop the specific
261+
// session id without touching sibling sessions, and drop the per-user map
262+
// entry when the user's last stream ends.
263+
func TestUnregisterSSESession_RemovesEntry(t *testing.T) {
264+
userID := uuid.New()
265+
266+
var cancelCalledA int32
267+
cancelA := func() { atomic.StoreInt32(&cancelCalledA, 1) }
268+
var cancelCalledB int32
269+
cancelB := func() { atomic.StoreInt32(&cancelCalledB, 1) }
270+
271+
idA := registerSSESession(userID, cancelA)
272+
idB := registerSSESession(userID, cancelB)
273+
274+
unregisterSSESession(userID, idA)
275+
276+
// Removing one entry must not invoke any cancel funcs — cancellation is
277+
// a separate concern from registry cleanup.
278+
assert.Equal(t, int32(0), atomic.LoadInt32(&cancelCalledA))
279+
assert.Equal(t, int32(0), atomic.LoadInt32(&cancelCalledB))
280+
281+
// Entry for B must still be present.
282+
sseSessionsMu.Lock()
283+
sessions, ok := sseSessions[userID]
284+
remaining := len(sessions)
285+
sseSessionsMu.Unlock()
286+
require.True(t, ok)
287+
assert.Equal(t, 1, remaining, "session B should still be registered")
288+
289+
// Removing the last entry should drop the whole per-user map slot.
290+
unregisterSSESession(userID, idB)
291+
sseSessionsMu.Lock()
292+
_, stillThere := sseSessions[userID]
293+
sseSessionsMu.Unlock()
294+
assert.False(t, stillThere, "per-user map entry should be removed when empty")
295+
}
296+
297+
// TestCancelUserSSEStreams_NoSessions verifies that calling the cancel
298+
// function for a user with no registered streams is a no-op and does not
299+
// panic — logout must always be safe to call whether or not the user had an
300+
// open SSE stream.
301+
func TestCancelUserSSEStreams_NoSessions(t *testing.T) {
302+
userID := uuid.New()
303+
// Should not panic, should not block.
304+
CancelUserSSEStreams(userID)
305+
}

0 commit comments

Comments
 (0)