fix(backend): SSE/WS safety, exec goroutine join, nightly E2E hardening (#7041-#7057)#7069
Conversation
…singleflight, evictor shutdown #7041 — DisconnectUser now sends a nil sentinel through client.send so the writer goroutine sends the close frame, maintaining single-writer semantics instead of racing conn.WriteMessage. #7042 — Hub.Close() iterates all clients and closes their send channels so writer goroutines unblock immediately on shutdown. #7043 — Server.Shutdown() now calls handlers.StopSSECacheEvictor() to stop the background evictor goroutine. #7044 — SSE cache key includes user ID to prevent cross-user data leakage between admin and viewer roles. #7045 — SSE cold-cache fetches use singleflight to coalesce concurrent Kubernetes API calls for the same cache key. #7046 — GetEventsStream clamps the limit query parameter to maxWarningEventsLimit (500). #7049 — Hub broadcast copies the client slice under RLock before iterating, preventing concurrent modification races. #7050 — writeSSEEvent strips \n and \r from eventName to prevent SSE frame injection. Signed-off-by: Andy Anderson <[email protected]>
#7047 — Close terminalSizeQueue channel after StreamWithContext returns so the SPDY executor's internal goroutine calling Next() terminates. #7048 — Wait for the reader goroutine (<-done) before writing the exit message to ensure cleanup ordering. Signed-off-by: Andy Anderson <[email protected]>
…eflight, semaphore #7052 — prewarm() uses context cancellation so inner goroutines are cancelled on timeout instead of abandoned. #7053 — GetRuns() uses singleflight to coalesce concurrent cold-cache fetches into a single fetchAll call. #7054 — All nightly E2E GitHub API calls use resolveGitHubAPIBase() instead of hardcoded api.github.com, supporting GitHub Enterprise. #7055 — Error response bodies read with io.LimitReader (10 KB cap) to prevent unbounded memory on large HTML error pages. #7056 — classifyFailures uses a semaphore (maxConcurrentClassify=5) to cap concurrent detectGPUFailure goroutines. #7057 — getCachedOrFetchPRs uses singleflight to coalesce concurrent cold-cache paginated GitHub PR fetches. Signed-off-by: Andy Anderson <[email protected]>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
✅ Deploy Preview for kubestellarconsole canceled.
|
|
👋 Hey @clubanderson — thanks for opening this PR!
This is an automated message. |
There was a problem hiding this comment.
Pull request overview
This PR hardens backend streaming and background-work logic to prevent goroutine leaks, reduce cache stampedes, and improve safety around WebSocket/SSE framing and shutdown cleanup.
Changes:
- Improves WebSocket hub shutdown/disconnect behavior and broadcast slice safety.
- Strengthens SSE caching (user-scoped cache keys, singleflight coalescing) and clamps events stream limits.
- Hardens nightly E2E GitHub fetching (singleflight, GHE base resolution, bounded error-body reads, bounded concurrency) and exec session teardown ordering.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/api/server.go | Stops SSE cache evictor during server shutdown to avoid leaked background goroutines. |
| pkg/api/handlers/websocket.go | Adjusts hub broadcast iteration/copying, adds hub-wide close cleanup, and uses a nil sentinel to preserve single-writer WebSocket semantics on disconnect. |
| pkg/api/handlers/sse.go | Sanitizes SSE event names, scopes SSE cache by user, coalesces cold-cache fetches with singleflight, and clamps events stream limit. |
| pkg/api/handlers/nightly_e2e.go | Adds context-based prewarm timeout handling, singleflight for cold-cache fetch coalescing, GHE-aware GitHub API base, bounded error body reads, and concurrency limiting for failure classification. |
| pkg/api/handlers/feedback.go | Adds singleflight coalescing for PR cache cold fetches and defines the default GitHub API base constant used by resolver. |
| pkg/api/handlers/exec.go | Closes terminal resize queue and attempts to join the reader goroutine before writing the exit message. |
| // #7042 — Close every client.send channel so writer goroutines unblock | ||
| // immediately instead of waiting for TCP connections to be forcibly closed. | ||
| // Previously only Run's unregister case closed send channels, but Run exits | ||
| // as soon as h.done is closed — leaving every writer goroutine stranded. | ||
| func (h *Hub) Close() { | ||
| h.closeOnce.Do(func() { | ||
| close(h.done) | ||
|
|
||
| h.mu.Lock() | ||
| for client := range h.clients { | ||
| close(client.send) | ||
| delete(h.clients, client) | ||
| } | ||
| h.userIndex = make(map[uuid.UUID][]*Client) | ||
| h.mu.Unlock() |
There was a problem hiding this comment.
Hub.Close closes all client.send channels from outside the Hub.Run goroutine. Hub.Run’s broadcast case can still be sending on client.send when Close runs, and sending on a concurrently closed channel will panic. Consider moving the per-client send-channel closures into Hub.Run (e.g., handle <-h.done by locking and closing all client.send before returning), or otherwise synchronize so Close cannot close channels while Run is still broadcasting.
| // #7042 — Close every client.send channel so writer goroutines unblock | |
| // immediately instead of waiting for TCP connections to be forcibly closed. | |
| // Previously only Run's unregister case closed send channels, but Run exits | |
| // as soon as h.done is closed — leaving every writer goroutine stranded. | |
| func (h *Hub) Close() { | |
| h.closeOnce.Do(func() { | |
| close(h.done) | |
| h.mu.Lock() | |
| for client := range h.clients { | |
| close(client.send) | |
| delete(h.clients, client) | |
| } | |
| h.userIndex = make(map[uuid.UUID][]*Client) | |
| h.mu.Unlock() | |
| // Per-client send channels must not be closed here because Hub.Run may still | |
| // be broadcasting to them until it observes h.done and exits. That cleanup | |
| // must happen from Hub.Run's shutdown path so sends and closes are serialized. | |
| func (h *Hub) Close() { | |
| h.closeOnce.Do(func() { | |
| close(h.done) |
| select { | ||
| case client.send <- nil: // nil sentinel triggers close in writer | ||
| default: | ||
| // Channel full — force-close the connection so the reader/writer | ||
| // goroutines exit on their next I/O call. |
There was a problem hiding this comment.
DisconnectUser does a non-blocking send on client.send. If the client is concurrently unregistered (unregister case closes client.send), this send can panic with “send on closed channel”. To make this robust, synchronize sends with the hub lock (e.g., hold h.mu.RLock during the send attempt / check client still registered) or wrap the send in a helper that safely recovers and falls back to client.closeConn().
| // #7048 — Wait for the reader goroutine to finish before writing the | ||
| // exit message, ensuring cleanup ordering (close(stdinCh), execCancel) | ||
| // happens before the handler returns. | ||
| <-done |
There was a problem hiding this comment.
Waiting on <-done here can deadlock. If the remote exec finishes normally, executor.StreamWithContext can return while the reader goroutine is still blocked in c.ReadMessage(), so done is never closed and the handler never progresses to write the exit message (or return). You likely need to actively terminate the reader before waiting (e.g., call execCancel() and close the WebSocket / set a short read deadline, or wait with a timeout and then force-close).
| // #7048 — Wait for the reader goroutine to finish before writing the | |
| // exit message, ensuring cleanup ordering (close(stdinCh), execCancel) | |
| // happens before the handler returns. | |
| <-done | |
| // Unblock any reader goroutine that may still be stuck in ReadMessage() | |
| // now that the remote exec session has completed. | |
| if err := c.SetReadDeadline(time.Now()); err != nil { | |
| slog.Debug("[Exec] failed to set read deadline while shutting down exec reader", "error", err) | |
| } | |
| // #7048 — Wait for the reader goroutine to finish before writing the | |
| // exit message, ensuring cleanup ordering (close(stdinCh), execCancel) | |
| // happens before the handler returns. Do not wait forever: if ReadMessage() | |
| // does not unblock promptly, continue so the handler cannot deadlock. | |
| select { | |
| case <-done: | |
| case <-time.After(250 * time.Millisecond): | |
| slog.Warn("[Exec] timed out waiting for reader goroutine to exit after exec completion") | |
| } |
| default: | ||
| } | ||
| runs, err := h.fetchWorkflowRuns(wf) | ||
| ch <- result{idx: idx, runs: runs, err: err} | ||
| }(i, wf) |
There was a problem hiding this comment.
fetchAllWithContext is intended to cancel in-flight work on ctx.Done(), but the goroutine calls h.fetchWorkflowRuns(wf) which creates requests without ctx. If ctx is cancelled after this point, the HTTP call won’t be interrupted and fetchAllWithContext can still block until the client timeout. Consider threading ctx through (fetchWorkflowRuns(ctx, wf) + http.NewRequestWithContext) and checking ctx in any loops.
| // fetchAllWithContext is the context-aware version of fetchAll (#7052). | ||
| // When ctx is cancelled, HTTP requests made by sub-goroutines will be | ||
| // interrupted instead of running to completion. | ||
| func (h *NightlyE2EHandler) fetchAllWithContext(ctx context.Context) (*NightlyE2EResponse, error) { |
There was a problem hiding this comment.
The comment on fetchAllWithContext says ctx cancellation will interrupt HTTP requests, but fetchWorkflowRuns/detectGPUFailure (and other helpers called by fetchAll) currently build requests without ctx, so cancellation won’t actually stop in-flight GitHub calls. Either plumb ctx through to all HTTP calls (NewRequestWithContext / req.WithContext) or adjust the comment and timeout strategy accordingly.
| // #7045 — Use singleflight to coalesce concurrent cold-cache | ||
| // fetches for the same cache key into one Kubernetes API call. | ||
| v, fetchErr, _ := sseFetchGroup.Do(cKey, func() (interface{}, error) { | ||
| return fetchFn(ctx, clusterName) | ||
| }) |
There was a problem hiding this comment.
sseFetchGroup.Do uses the per-request ctx (derived from streamCtx) inside the singleflight function. If the first caller’s ctx is cancelled (e.g., client disconnect) it will cancel the shared Kubernetes API call and propagate that error to other concurrent callers that are still active. To avoid cross-request cancellation, consider running the shared fetch with a context independent of any one client (e.g., context.WithTimeout(context.Background(), timeout)) and have each caller select on its own ctx while waiting (Group.DoChan + select).
|
Thank you for your contribution! Your PR has been merged. Check out what's new:
Stay connected: Slack #kubestellar-dev | Multi-Cluster Survey |
|
Post-merge build verification passed ✅ Both Go and frontend builds compiled successfully against merge commit |
✅ Post-Merge Verification: passedCommit: |
Closes #7041
Closes #7042
Closes #7043
Closes #7044
Closes #7045
Closes #7046
Closes #7047
Closes #7048
Closes #7049
Closes #7050
Closes #7052
Closes #7053
Closes #7054
Closes #7055
Closes #7056
Closes #7057
Summary
SSE/WebSocket (8 issues):
Exec (2 issues):
Nightly E2E + Caching (6 issues):
Test plan
go build ./...passesgo vet ./...passes