Skip to content

Commit 954546f

Browse files
Fix effect runtime usage and sqlite error classification
- Bind fork/promise helpers to the current services in Claude adapter and tests - Switch drainable worker to tx primitives and simplify enqueue - Classify sqlite errors with the built-in helper - Update tests to use the new Effect mocking and runtime helpers
1 parent 010e603 commit 954546f

File tree

6 files changed

+45
-33
lines changed

6 files changed

+45
-33
lines changed

apps/server/src/git/Layers/GitCore.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,9 @@ it.layer(TestLayer)("git integration", (it) => {
415415

416416
it.effect("refreshes upstream behind count after checkout when remote branch advanced", () =>
417417
Effect.gen(function* () {
418+
const services = yield* Effect.services();
419+
const runPromise = Effect.runPromiseWith(services);
420+
418421
const remote = yield* makeTmpDir();
419422
const source = yield* makeTmpDir();
420423
const clone = yield* makeTmpDir();
@@ -449,7 +452,7 @@ it.layer(TestLayer)("git integration", (it) => {
449452
const core = yield* GitCore;
450453
yield* Effect.promise(() =>
451454
vi.waitFor(async () => {
452-
const details = await Effect.runPromise(core.statusDetails(source));
455+
const details = await runPromise(core.statusDetails(source));
453456
expect(details.branch).toBe(featureBranch);
454457
expect(details.aheadCount).toBe(0);
455458
expect(details.behindCount).toBe(1);

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,6 @@ describe("ProviderCommandReactor", () => {
194194
);
195195

196196
const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never;
197-
const textGeneration: TextGenerationShape = {
198-
generateCommitMessage: () => unsupported(),
199-
generatePrContent: () => unsupported(),
200-
generateBranchName,
201-
generateThreadTitle,
202-
};
203197
const service: ProviderServiceShape = {
204198
startSession: startSession as ProviderServiceShape["startSession"],
205199
sendTurn: sendTurn as ProviderServiceShape["sendTurn"],
@@ -226,7 +220,12 @@ describe("ProviderCommandReactor", () => {
226220
Layer.provideMerge(orchestrationLayer),
227221
Layer.provideMerge(Layer.succeed(ProviderService, service)),
228222
Layer.provideMerge(Layer.succeed(GitCore, { renameBranch } as unknown as GitCoreShape)),
229-
Layer.provideMerge(Layer.succeed(TextGeneration, textGeneration)),
223+
Layer.provideMerge(
224+
Layer.mock(TextGeneration, {
225+
generateBranchName,
226+
generateThreadTitle,
227+
}),
228+
),
230229
Layer.provideMerge(ServerSettingsService.layerTest()),
231230
Layer.provideMerge(ServerConfig.layerTest(process.cwd(), baseDir)),
232231
Layer.provideMerge(NodeServices.layer),

apps/server/src/persistence/NodeSqliteClient.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import * as Stream from "effect/Stream";
2020
import * as Reactivity from "effect/unstable/reactivity/Reactivity";
2121
import * as Client from "effect/unstable/sql/SqlClient";
2222
import type { Connection } from "effect/unstable/sql/SqlConnection";
23-
import { SqlError } from "effect/unstable/sql/SqlError";
23+
import { SqlError, classifySqliteError } from "effect/unstable/sql/SqlError";
2424
import * as Statement from "effect/unstable/sql/Statement";
2525

2626
const ATTR_DB_SYSTEM_NAME = "db.system.name";
@@ -29,14 +29,8 @@ export const TypeId: TypeId = "~local/sqlite-node/SqliteClient";
2929

3030
export type TypeId = "~local/sqlite-node/SqliteClient";
3131

32-
const toSqlError = (cause: unknown, message: string, operation: string) =>
33-
new SqlError({
34-
cause,
35-
message:
36-
cause instanceof Error && cause.message.length > 0
37-
? `${message} (${operation}): ${cause.message}`
38-
: `${message} (${operation})`,
39-
});
32+
const classifyError = (cause: unknown, message: string, operation: string) =>
33+
classifySqliteError(cause, { message, operation });
4034

4135
/**
4236
* SqliteClient - Effect service tag for the sqlite SQL client.
@@ -118,7 +112,10 @@ const makeWithDatabase = (
118112
lookup: (sql: string) =>
119113
Effect.try({
120114
try: () => db.prepare(sql),
121-
catch: (cause) => toSqlError(cause, "Failed to prepare statement", "prepare"),
115+
catch: (cause) =>
116+
new SqlError({
117+
reason: classifyError(cause, "Failed to prepare statement", "prepare"),
118+
}),
122119
}),
123120
});
124121

@@ -136,7 +133,11 @@ const makeWithDatabase = (
136133
const result = statement.run(...(params as any));
137134
return Effect.succeed(raw ? (result as unknown as ReadonlyArray<any>) : []);
138135
} catch (cause) {
139-
return Effect.fail(toSqlError(cause, "Failed to execute statement", "execute"));
136+
return Effect.fail(
137+
new SqlError({
138+
reason: classifyError(cause, "Failed to execute statement", "execute"),
139+
}),
140+
);
140141
}
141142
});
142143

@@ -159,7 +160,10 @@ const makeWithDatabase = (
159160
statement.run(...(params as any));
160161
return [];
161162
},
162-
catch: (cause) => toSqlError(cause, "Failed to execute statement", "execute"),
163+
catch: (cause) =>
164+
new SqlError({
165+
reason: classifyError(cause, "Failed to execute statement", "execute"),
166+
}),
163167
}),
164168
(statement) =>
165169
Effect.sync(() => {

apps/server/src/provider/Layers/ClaudeAdapter.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,10 +1100,13 @@ describe("ClaudeAdapterLive", () => {
11001100
it.effect("closes the session when the Claude stream aborts after a turn starts", () => {
11011101
const harness = makeHarness();
11021102
return Effect.gen(function* () {
1103+
const services = yield* Effect.services();
1104+
const runFork = Effect.runForkWith(services);
1105+
11031106
const adapter = yield* ClaudeAdapter;
11041107
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
11051108

1106-
const runtimeEventsFiber = yield* Effect.forkChild(
1109+
const runtimeEventsFiber = runFork(
11071110
Stream.runForEach(adapter.streamEvents, (event) =>
11081111
Effect.sync(() => {
11091112
runtimeEvents.push(event);
@@ -1197,9 +1200,12 @@ describe("ClaudeAdapterLive", () => {
11971200
);
11981201

11991202
return Effect.gen(function* () {
1203+
const services = yield* Effect.services();
1204+
const runFork = Effect.runForkWith(services);
1205+
12001206
const adapter = yield* ClaudeAdapter;
12011207

1202-
const runtimeEventsFiber = yield* Effect.forkChild(
1208+
const runtimeEventsFiber = runFork(
12031209
Stream.runForEach(adapter.streamEvents, () => Effect.void),
12041210
);
12051211

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2380,8 +2380,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
23802380
existingResumeSessionId === undefined ? yield* Random.nextUUIDv4 : undefined;
23812381
const sessionId = existingResumeSessionId ?? newSessionId;
23822382

2383-
const runFork = Effect.runFork;
2384-
const runPromise = Effect.runPromise;
2383+
const services = yield* Effect.services();
2384+
const runFork = Effect.runForkWith(services);
2385+
const runPromise = Effect.runPromiseWith(services);
23852386

23862387
const promptQueue = yield* Queue.unbounded<PromptQueueItem>();
23872388
const prompt = Stream.fromQueue(promptQueue).pipe(

packages/shared/src/DrainableWorker.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ export const makeDrainableWorker = <A, E, R>(
3737
process: (item: A) => Effect.Effect<void, E, R>,
3838
): Effect.Effect<DrainableWorker<A>, never, Scope.Scope | R> =>
3939
Effect.gen(function* () {
40-
const ref = yield* Effect.transaction(TxRef.make(0));
40+
const ref = yield* TxRef.make(0);
4141

42-
const queue = yield* Effect.acquireRelease(
43-
Effect.transaction(TxQueue.unbounded<A>()),
44-
(queue) => Effect.transaction(TxQueue.shutdown(queue)),
42+
const queue = yield* Effect.acquireRelease(TxQueue.unbounded<A>(), (queue) =>
43+
TxQueue.shutdown(queue),
4544
);
4645

47-
const takeItem = Effect.transaction(
46+
const takeItem = Effect.tx(
4847
Effect.gen(function* () {
4948
const item = yield* TxQueue.take(queue);
5049
yield* TxRef.update(ref, (n) => n + 1);
@@ -54,24 +53,24 @@ export const makeDrainableWorker = <A, E, R>(
5453

5554
yield* takeItem.pipe(
5655
Effect.flatMap((item) =>
57-
process(item).pipe(Effect.ensuring(Effect.transaction(TxRef.update(ref, (n) => n - 1)))),
56+
process(item).pipe(Effect.ensuring(TxRef.update(ref, (n) => n - 1))),
5857
),
5958
Effect.forever,
6059
Effect.forkScoped,
6160
);
6261

63-
const drain: DrainableWorker<A>["drain"] = Effect.transaction(
62+
const drain: DrainableWorker<A>["drain"] = Effect.tx(
6463
Effect.gen(function* () {
6564
const inFlight = yield* TxRef.get(ref);
6665
const isEmpty = yield* TxQueue.isEmpty(queue);
6766
if (inFlight > 0 || !isEmpty) {
68-
return yield* Effect.retryTransaction;
67+
return yield* Effect.txRetry;
6968
}
7069
}),
7170
);
7271

7372
return {
74-
enqueue: (item) => Effect.transaction(TxQueue.offer(queue, item)).pipe(Effect.asVoid),
73+
enqueue: (item) => TxQueue.offer(queue, item),
7574
drain,
7675
} satisfies DrainableWorker<A>;
7776
});

0 commit comments

Comments
 (0)