Skip to content

Commit 492a2f5

Browse files
author
zynx
committed
fix(cron): address review issues from PR #1447
- Fix setProcessing(true) busy-state leak: move into executeJob after task acquisition via onAcquired callback, preserving onceIdle ordering guarantee - Remove FallbackConversationRepository dead-code class (no instantiation point, no fallback logic, no tests); remove from vitest coverage include - Add missing searchMessages mock to WorkerTaskManager.test.ts makeRepo() Review follow-up for #1447
1 parent 82f8924 commit 492a2f5

File tree

7 files changed

+22
-67
lines changed

7 files changed

+22
-67
lines changed

src/process/database/FallbackConversationRepository.ts

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/process/services/cron/CronService.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -380,13 +380,12 @@ export class CronService {
380380
let lastError: string | undefined;
381381

382382
try {
383-
// Mark conversation as busy BEFORE registering the idle callback,
384-
// so onceIdle registers a deferred callback instead of firing immediately.
385-
this.executor.setProcessing(conversationId, true);
386-
this.registerCompletionNotification(job);
387-
388-
// Delegate task-get + sendMessage to the executor.
389-
await this.executor.executeJob(job);
383+
// executeJob marks the conversation busy only after task acquisition succeeds.
384+
// The onAcquired callback registers the completion notification while the
385+
// conversation is already busy, preventing premature onceIdle fires.
386+
await this.executor.executeJob(job, () => {
387+
this.registerCompletionNotification(job);
388+
});
390389

391390
// Success
392391
this.retryCounts.delete(job.id);

src/process/services/cron/ICronJobExecutor.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ import type { CronJob } from './CronStore';
99
export interface ICronJobExecutor {
1010
/** Returns true if the conversation already has an active run in progress. */
1111
isConversationBusy(conversationId: string): boolean;
12-
/** Execute the job's payload against the target conversation. */
13-
executeJob(job: CronJob): Promise<void>;
12+
/** Execute the job's payload against the target conversation.
13+
* @param onAcquired - Called after task acquisition succeeds, before sendMessage.
14+
* Use this hook to register completion notifications while guaranteeing that
15+
* busy-state has already been set (avoiding premature onceIdle fires). */
16+
executeJob(job: CronJob, onAcquired?: () => void): Promise<void>;
1417
/** Register a callback to fire once the conversation becomes idle. */
1518
onceIdle(conversationId: string, callback: () => Promise<void>): void;
1619
/** Mark the conversation as busy/not-busy. */

src/process/services/cron/WorkerTaskManagerJobExecutor.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export class WorkerTaskManagerJobExecutor implements ICronJobExecutor {
2424
return this.busyGuard.isProcessing(conversationId);
2525
}
2626

27-
async executeJob(job: CronJob): Promise<void> {
27+
async executeJob(job: CronJob, onAcquired?: () => void): Promise<void> {
2828
const { conversationId } = job.metadata;
2929
const messageText = job.target.payload.text;
3030
const msgId = uuid();
@@ -45,6 +45,14 @@ export class WorkerTaskManagerJobExecutor implements ICronJobExecutor {
4545
task = await this.taskManager.getOrBuildTask(conversationId, { yoloMode: true });
4646
}
4747

48+
// Mark busy only after task acquisition succeeds. This ensures that if
49+
// getOrBuildTask throws (conversation deleted), setProcessing(true) is never
50+
// called and no "busy" state leaks into subsequent runs.
51+
this.busyGuard.setProcessing(conversationId, true);
52+
// Notify caller so it can register onceIdle callbacks while the conversation
53+
// is already marked busy (prevents premature idle fires).
54+
onAcquired?.();
55+
4856
const workspace = (task as { workspace?: string }).workspace;
4957
const workspaceFiles = workspace ? await copyFilesToDirectory(workspace, [], false) : [];
5058

tests/unit/WorkerTaskManager.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ function makeRepo(overrides?: Partial<IConversationRepository>): IConversationRe
1616
insertMessage: vi.fn(),
1717
getUserConversations: vi.fn(),
1818
listAllConversations: vi.fn(() => []),
19+
searchMessages: vi.fn(() => ({ items: [], total: 0, page: 0, pageSize: 20, hasMore: false })),
1920
...overrides,
2021
};
2122
}

tests/unit/cronService.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ describe('CronService', () => {
232232
// Advance exactly one interval period to fire the timer once.
233233
await vi.advanceTimersByTimeAsync(60000);
234234

235-
expect(executor.executeJob).toHaveBeenCalledWith(job);
235+
expect(executor.executeJob).toHaveBeenCalledWith(job, expect.any(Function));
236236
expect(repo.update).toHaveBeenCalledWith(
237237
'j1',
238238
expect.objectContaining({ state: expect.objectContaining({ lastStatus: 'ok' }) })

vitest.config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ export default defineConfig({
6767
'src/process/bridge/conversationBridge.ts',
6868
// Decoupling layer (interfaces + implementations)
6969
'src/process/database/SqliteConversationRepository.ts',
70-
'src/process/database/FallbackConversationRepository.ts',
7170
'src/process/database/SqliteChannelRepository.ts',
7271
'src/process/services/ConversationServiceImpl.ts',
7372
'src/process/services/cron/SqliteCronRepository.ts',

0 commit comments

Comments
 (0)