Skip to content

Commit 48b03aa

Browse files
committed
fix(cron): consolidate announce delivery and detach manual runs
1 parent 5b27b0c commit 48b03aa

File tree

7 files changed

+155
-14
lines changed

7 files changed

+155
-14
lines changed

src/agents/subagent-announce.timeout.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,25 @@ describe("subagent announce timeout config", () => {
197197
expect(internalEvents[0]?.announceType).toBe("cron job");
198198
});
199199

200+
it("regression, keeps child announce internal when requester is a cron run session", async () => {
201+
const cronSessionKey = "agent:main:cron:daily-check:run:run-123";
202+
203+
await runAnnounceFlowForTest("run-cron-internal", {
204+
requesterSessionKey: cronSessionKey,
205+
requesterDisplayKey: cronSessionKey,
206+
requesterOrigin: { channel: "discord", to: "channel:cron-results", accountId: "acct-1" },
207+
});
208+
209+
const directAgentCall = findGatewayCall(
210+
(call) => call.method === "agent" && call.expectFinal === true,
211+
);
212+
expect(directAgentCall?.params?.sessionKey).toBe(cronSessionKey);
213+
expect(directAgentCall?.params?.deliver).toBe(false);
214+
expect(directAgentCall?.params?.channel).toBeUndefined();
215+
expect(directAgentCall?.params?.to).toBeUndefined();
216+
expect(directAgentCall?.params?.accountId).toBeUndefined();
217+
});
218+
200219
it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => {
201220
const parentSessionKey = "agent:main:subagent:parent";
202221
requesterDepthResolver = (sessionKey?: string) =>

src/agents/subagent-announce.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type { ConversationRef } from "../infra/outbound/session-binding-service.
1414
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
1515
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
1616
import { defaultRuntime } from "../runtime.js";
17+
import { isCronSessionKey } from "../sessions/session-key-utils.js";
1718
import { extractTextFromChatContent } from "../shared/chat-content.js";
1819
import {
1920
type DeliveryContext,
@@ -78,6 +79,10 @@ function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): n
7879
return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS);
7980
}
8081

82+
function isInternalAnnounceRequesterSession(sessionKey: string | undefined): boolean {
83+
return getSubagentDepthFromSessionStore(sessionKey) >= 1 || isCronSessionKey(sessionKey);
84+
}
85+
8186
function summarizeDeliveryError(error: unknown): string {
8287
if (error instanceof Error) {
8388
return error.message || "error";
@@ -580,8 +585,7 @@ async function resolveSubagentCompletionOrigin(params: {
580585
async function sendAnnounce(item: AnnounceQueueItem) {
581586
const cfg = loadConfig();
582587
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
583-
const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey);
584-
const requesterIsSubagent = requesterDepth >= 1;
588+
const requesterIsSubagent = isInternalAnnounceRequesterSession(item.sessionKey);
585589
const origin = item.origin;
586590
const threadId =
587591
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
@@ -1216,6 +1220,8 @@ export async function runSubagentAnnounceFlow(params: {
12161220
}
12171221

12181222
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
1223+
const requesterIsInternalSession = () =>
1224+
requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey);
12191225

12201226
let childCompletionFindings: string | undefined;
12211227
let subagentRegistryRuntime:
@@ -1339,7 +1345,7 @@ export async function runSubagentAnnounceFlow(params: {
13391345
const announceSessionId = childSessionId || "unknown";
13401346
const findings = childCompletionFindings || reply || "(no output)";
13411347

1342-
let requesterIsSubagent = requesterDepth >= 1;
1348+
let requesterIsSubagent = requesterIsInternalSession();
13431349
if (requesterIsSubagent) {
13441350
const {
13451351
isSubagentSessionRunActive,
@@ -1363,7 +1369,7 @@ export async function runSubagentAnnounceFlow(params: {
13631369
targetRequesterOrigin =
13641370
normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin;
13651371
requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
1366-
requesterIsSubagent = requesterDepth >= 1;
1372+
requesterIsSubagent = requesterIsInternalSession();
13671373
}
13681374
}
13691375
}

src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,29 @@ describe("dispatchCronDelivery — double-announce guard", () => {
208208
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
209209
});
210210

211+
it("consolidates descendant output into the cron announce path", async () => {
212+
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
213+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
214+
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(
215+
"Detailed child result, everything finished successfully.",
216+
);
217+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
218+
219+
const params = makeBaseParams({ synthesizedText: "on it" });
220+
const state = await dispatchCronDelivery(params);
221+
222+
expect(state.deliveryAttempted).toBe(true);
223+
expect(state.delivered).toBe(true);
224+
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
225+
expect(runSubagentAnnounceFlow).toHaveBeenCalledWith(
226+
expect.objectContaining({
227+
roundOneReply: "Detailed child result, everything finished successfully.",
228+
expectsCompletionMessage: true,
229+
announceType: "cron job",
230+
}),
231+
);
232+
});
233+
211234
it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => {
212235
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
213236
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);

src/cron/service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ export class CronService {
4646
return await ops.run(this.state, id, mode);
4747
}
4848

49+
async enqueueRun(id: string, mode?: "due" | "force") {
50+
return await ops.enqueueRun(this.state, id, mode);
51+
}
52+
4953
getJob(id: string): CronJob | undefined {
5054
return this.state.store?.jobs.find((job) => job.id === id);
5155
}

src/cron/service/ops.ts

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,27 @@ export async function remove(state: CronServiceState, id: string) {
337337
});
338338
}
339339

340-
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
341-
const prepared = await locked(state, async () => {
340+
type PreparedManualRun =
341+
| {
342+
ok: true;
343+
ran: false;
344+
reason: "already-running" | "not-due";
345+
}
346+
| {
347+
ok: true;
348+
ran: true;
349+
jobId: string;
350+
startedAt: number;
351+
executionJob: CronJob;
352+
}
353+
| { ok: false };
354+
355+
async function prepareManualRun(
356+
state: CronServiceState,
357+
id: string,
358+
mode?: "due" | "force",
359+
): Promise<PreparedManualRun> {
360+
return await locked(state, async () => {
342361
warnIfDisabled(state, "run");
343362
await ensureLoaded(state, { skipRecompute: true });
344363
// Normalize job tick state (clears stale runningAtMs markers) before
@@ -363,7 +382,7 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
363382
// force-reload from disk cannot start the same job concurrently.
364383
await persist(state);
365384
emit(state, { jobId: job.id, action: "started", runAtMs: now });
366-
const executionJob = JSON.parse(JSON.stringify(job)) as typeof job;
385+
const executionJob = JSON.parse(JSON.stringify(job)) as CronJob;
367386
return {
368387
ok: true,
369388
ran: true,
@@ -372,13 +391,13 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
372391
executionJob,
373392
} as const;
374393
});
394+
}
375395

376-
if (!prepared.ran) {
377-
return prepared;
378-
}
379-
if (!prepared.executionJob || typeof prepared.startedAt !== "number") {
380-
return { ok: false } as const;
381-
}
396+
async function finishPreparedManualRun(
397+
state: CronServiceState,
398+
prepared: Extract<PreparedManualRun, { ran: true }>,
399+
mode?: "due" | "force",
400+
): Promise<void> {
382401
const executionJob = prepared.executionJob;
383402
const startedAt = prepared.startedAt;
384403
const jobId = prepared.jobId;
@@ -459,7 +478,28 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
459478
await persist(state);
460479
armTimer(state);
461480
});
481+
}
462482

483+
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
484+
const prepared = await prepareManualRun(state, id, mode);
485+
if (!prepared.ran) {
486+
return prepared;
487+
}
488+
await finishPreparedManualRun(state, prepared, mode);
489+
return { ok: true, ran: true } as const;
490+
}
491+
492+
export async function enqueueRun(state: CronServiceState, id: string, mode?: "due" | "force") {
493+
const prepared = await prepareManualRun(state, id, mode);
494+
if (!prepared.ran) {
495+
return prepared;
496+
}
497+
void finishPreparedManualRun(state, prepared, mode).catch((err) => {
498+
state.deps.log.error(
499+
{ jobId: prepared.jobId, err: String(err) },
500+
"cron: queued manual run background execution failed",
501+
);
502+
});
463503
return { ok: true, ran: true } as const;
464504
}
465505

src/gateway/server-methods/cron.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ export const cronHandlers: GatewayRequestHandlers = {
212212
);
213213
return;
214214
}
215-
const result = await context.cron.run(jobId, p.mode ?? "force");
215+
const result = await context.cron.enqueueRun(jobId, p.mode ?? "force");
216216
respond(true, result, undefined);
217217
},
218218
"cron.runs": async ({ params, respond, context }) => {

src/gateway/server.cron.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,55 @@ describe("gateway server cron", () => {
548548
}
549549
}, 45_000);
550550

551+
test("returns from cron.run immediately while isolated work continues in background", async () => {
552+
const { prevSkipCron, dir } = await setupCronTestRun({
553+
tempPrefix: "openclaw-gw-cron-run-detached-",
554+
});
555+
556+
const { server, ws } = await startServerWithClient();
557+
await connectOk(ws);
558+
559+
let resolveRun: ((value: { status: "ok"; summary: string }) => void) | undefined;
560+
cronIsolatedRun.mockImplementationOnce(
561+
() =>
562+
new Promise((resolve) => {
563+
resolveRun = resolve as (value: { status: "ok"; summary: string }) => void;
564+
}),
565+
);
566+
567+
try {
568+
const addRes = await rpcReq(ws, "cron.add", {
569+
name: "detached run test",
570+
enabled: true,
571+
schedule: { kind: "every", everyMs: 60_000 },
572+
sessionTarget: "isolated",
573+
wakeMode: "next-heartbeat",
574+
payload: { kind: "agentTurn", message: "do work" },
575+
delivery: { mode: "none" },
576+
});
577+
expect(addRes.ok).toBe(true);
578+
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
579+
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
580+
expect(jobId.length > 0).toBe(true);
581+
582+
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 1_000);
583+
expect(runRes.ok).toBe(true);
584+
expect(runRes.payload).toEqual({ ok: true, ran: true });
585+
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
586+
587+
resolveRun?.({ status: "ok", summary: "background finished" });
588+
589+
const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`);
590+
let raw = "";
591+
await waitForCondition(async () => {
592+
raw = await fs.readFile(logPath, "utf-8").catch(() => "");
593+
return raw.includes("background finished");
594+
}, CRON_WAIT_TIMEOUT_MS);
595+
} finally {
596+
await cleanupCronTestRun({ ws, server, prevSkipCron });
597+
}
598+
});
599+
551600
test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => {
552601
const legacyNotifyJob = {
553602
id: "legacy-notify-job",

0 commit comments

Comments
 (0)