Skip to content

race: _clear_stale_stream_state mutates outside per-session lock (Opus follow-up from v0.50.279) #1533

@nesquena-hermes

Description

@nesquena-hermes

Summary

_clear_stale_stream_state() (api/routes.py:237) holds STREAMS_LOCK only across the registry lookup; the write to session.active_stream_id = None happens after release. A concurrent _handle_chat_start on the same session can race:

  1. Reader thread: STREAMS_LOCKstream_id not in STREAMS → decide to clear → release lock
  2. Writer thread (chat_start): clear stale → register new Y in STREAMS → set s.active_stream_id = Y → save
  3. Reader thread (resumed): write s.active_stream_id = None, clobbering Y

Effect

The user sees the new stream as orphaned and has to retry. No data corruption — the stream is registered in STREAMS but the session JSON forgets the active_stream_id so the frontend can't reconnect to it. Worst case is one retry per user.

Why it shipped this way

PR #1525 (@ai-ag2026, v0.50.279) introduced the helper as the proactive cleanup half of the #1471 stale-stream class. Opus advisor pass on stage-279 flagged this race during the v0.50.279 release brief and noted: "Effect is bounded ... Probably fine to defer to a follow-up given the narrow window — flag it as known." Maintainer agreed to defer to keep the release surface small.

Suggested fix (~10 LOC)

Wrap the mutate-and-save block in a per-session lock and re-read active_stream_id inside it:

def _clear_stale_stream_state(session) -> bool:
    stream_id = getattr(session, "active_stream_id", None)
    if not stream_id:
        return False
    with STREAMS_LOCK:
        stream_alive = stream_id in STREAMS
    if stream_alive:
        return False
    # Re-read inside the per-session lock so we don't clobber a concurrent
    # chat_start that registered a new stream id between our lookup and our
    # write.
    with _get_session_agent_lock(session.session_id):
        if getattr(session, "active_stream_id", None) != stream_id:
            return False  # changed under us; the new stream owns it now
        session.active_stream_id = None
        if hasattr(session, "pending_user_message"):
            session.pending_user_message = None
        if hasattr(session, "pending_attachments"):
            session.pending_attachments = []
        if hasattr(session, "pending_started_at"):
            session.pending_started_at = None
        try:
            session.save()
        except Exception:
            pass
    return True

Plus a regression test that simulates the race (concurrent reader + writer threads on the same session, asserting no active_stream_id = None clobber after chat_start registered a new stream).

Acceptance

  • _clear_stale_stream_state re-reads under per-session lock before mutating
  • Regression test exercises the race and asserts no clobber
  • No regression in the existing 5 test_stale_stream_cleanup.py tests

Priority

Low. Bounded effect, recoverable via retry, narrow window. Surfaced by Opus during stage-279 review and explicitly deferred to keep that release scoped.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions