Skip to content

Commit 4e4055c

Browse files
committed
Fix detached fiber in ClaudeCodeAdapter to use Effect.forkDetach with explicit interruption
Replace Effect.runFork(runSdkStream(context)) with yield* Effect.forkDetach() to properly inherit provided services (e.g. deterministic Random in tests) and enable deterministic cleanup. Store the fiber reference in ClaudeSessionContext and interrupt it in stopSessionInternal before other cleanup, eliminating the race window where the stream fiber could offer events to runtimeEventQueue after it has been shut down by the scope finalizer.
1 parent dc91dc5 commit 4e4055c

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

apps/server/src/provider/Layers/ClaudeCodeAdapter.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,18 @@ import {
3131
ProviderThreadId,
3232
ProviderTurnId,
3333
} from "@t3tools/contracts";
34-
import { Cause, DateTime, Deferred, Effect, Layer, Queue, Random, Ref, Stream } from "effect";
34+
import {
35+
Cause,
36+
DateTime,
37+
Deferred,
38+
Effect,
39+
Fiber,
40+
Layer,
41+
Queue,
42+
Random,
43+
Ref,
44+
Stream,
45+
} from "effect";
3546

3647
import {
3748
ProviderAdapterProcessError,
@@ -99,6 +110,7 @@ interface ClaudeSessionContext {
99110
lastAssistantUuid: string | undefined;
100111
lastThreadStartedId: ProviderThreadId | undefined;
101112
stopped: boolean;
113+
streamFiber: Fiber.RuntimeFiber<void> | undefined;
102114
}
103115

104116
interface ClaudeQueryRuntime extends AsyncIterable<SDKMessage> {
@@ -117,9 +129,7 @@ export interface ClaudeCodeAdapterLiveOptions {
117129
}
118130

119131
function isUuid(value: string): boolean {
120-
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(
121-
value,
122-
);
132+
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value);
123133
}
124134

125135
function toMessage(cause: unknown, fallback: string): string {
@@ -695,6 +705,11 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
695705

696706
context.stopped = true;
697707

708+
if (context.streamFiber) {
709+
yield* Fiber.interrupt(context.streamFiber);
710+
context.streamFiber = undefined;
711+
}
712+
698713
for (const [requestId, pending] of context.pendingApprovals) {
699714
yield* Deferred.succeed(pending.decision, "cancel");
700715
const stamp = yield* makeEventStamp();
@@ -967,6 +982,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
967982
lastAssistantUuid: resumeState?.resumeSessionAt,
968983
lastThreadStartedId: undefined,
969984
stopped: false,
985+
streamFiber: undefined,
970986
};
971987
yield* Ref.set(contextRef, context);
972988
sessions.set(sessionId, context);
@@ -981,7 +997,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
981997
threadId,
982998
});
983999

984-
Effect.runFork(runSdkStream(context));
1000+
context.streamFiber = yield* Effect.forkDetach(runSdkStream(context));
9851001

9861002
return {
9871003
...session,

0 commit comments

Comments
 (0)