Skip to content

Commit e554c59

Browse files
authored
fix(cron): eliminate double-announce and replace delivery polling with push-based flow (#39089)
* fix(cron): eliminate double-announce and replace delivery polling with push-based flow - Set deliveryAttempted=true in announce early-return paths (active-subagent suppression and stale-interim suppression) so the heartbeat timer no longer fires a redundant enqueueSystemEvent fallback (double-announce bug). - Refactor waitForDescendantSubagentSummary to use event-based agent.wait RPC calls instead of a 500ms busy-poll loop. Each active descendant run is now awaited concurrently via Promise.allSettled, and only a short bounded grace period (5s) remains to capture the cron agent's post-orchestration synthesis. Eliminates O(n*timeoutMs/500ms) gateway calls and wasted wall-clock time. - Add FAST_TEST_MODE (OPENCLAW_TEST_FAST=1) to subagent-followup.ts to keep the grace-period tests instant in CI. - Add comprehensive tests for the new waitForDescendantSubagentSummary behaviour (push-based wait, error resilience, NO_REPLY handling, multi-descendant waits). * fix: prep cron double-announce followup tests (#39089) (thanks @tyler6204)
1 parent 97f9e25 commit e554c59

File tree

5 files changed

+620
-33
lines changed

5 files changed

+620
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ Docs: https://docs.openclaw.ai
628628

629629
### Fixes
630630

631+
- Cron/announce delivery: stop duplicate completion announces when cron early-return paths already handled delivery, and replace descendant followup polling with push-based waits so cron summaries arrive without the old busy-loop fallback. (#39089) Thanks @tyler6204.
631632
- Dashboard/macOS auth handling: switch the macOS “Open Dashboard” flow from query-string token injection to URL fragments, stop persisting Control UI gateway tokens in browser localStorage, and scrub legacy stored tokens on load. Thanks @JNX03 for reporting.
632633
- Models/provider config precedence: prefer exact `models.providers.<name>` matches before normalized provider aliases in embedded model resolution, preventing alias/canonical key collisions from applying the wrong provider `api`, `baseUrl`, or headers. (#35934) thanks @RealKai42.
633634
- Hooks/auth throttling: reject non-`POST` `/hooks/*` requests before auth-failure accounting so unsupported methods can no longer burn the hook auth lockout budget and block legitimate webhook delivery. Thanks @JNX03 for reporting.
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/**
2+
* Tests for the double-announce bug in cron delivery dispatch.
3+
*
4+
* Bug: early return paths in deliverViaAnnounce (active subagent suppression
5+
* and stale interim message suppression) returned without setting
6+
* deliveryAttempted = true. The timer saw deliveryAttempted = false and
7+
* fired enqueueSystemEvent as a fallback, causing a second announcement.
8+
*
9+
* Fix: both early return paths now set deliveryAttempted = true before
10+
* returning so the timer correctly skips the system-event fallback.
11+
*/
12+
13+
import { beforeEach, describe, expect, it, vi } from "vitest";
14+
15+
// --- Module mocks (must be hoisted before imports) ---
16+
17+
vi.mock("../../agents/subagent-announce.js", () => ({
18+
runSubagentAnnounceFlow: vi.fn().mockResolvedValue(true),
19+
}));
20+
21+
vi.mock("../../agents/subagent-registry.js", () => ({
22+
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
23+
}));
24+
25+
vi.mock("../../config/sessions.js", () => ({
26+
resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:main"),
27+
}));
28+
29+
vi.mock("../../infra/outbound/outbound-session.js", () => ({
30+
resolveOutboundSessionRoute: vi.fn().mockResolvedValue(null),
31+
ensureOutboundSessionEntry: vi.fn().mockResolvedValue(undefined),
32+
}));
33+
34+
vi.mock("../../infra/outbound/deliver.js", () => ({
35+
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
36+
}));
37+
38+
vi.mock("../../infra/outbound/identity.js", () => ({
39+
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({}),
40+
}));
41+
42+
vi.mock("../../infra/outbound/session-context.js", () => ({
43+
buildOutboundSessionContext: vi.fn().mockReturnValue({}),
44+
}));
45+
46+
vi.mock("../../cli/outbound-send-deps.js", () => ({
47+
createOutboundSendDeps: vi.fn().mockReturnValue({}),
48+
}));
49+
50+
vi.mock("../../logger.js", () => ({
51+
logWarn: vi.fn(),
52+
}));
53+
54+
vi.mock("./subagent-followup.js", () => ({
55+
expectsSubagentFollowup: vi.fn().mockReturnValue(false),
56+
isLikelyInterimCronMessage: vi.fn().mockReturnValue(false),
57+
readDescendantSubagentFallbackReply: vi.fn().mockResolvedValue(undefined),
58+
waitForDescendantSubagentSummary: vi.fn().mockResolvedValue(undefined),
59+
}));
60+
61+
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
62+
// Import after mocks
63+
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
64+
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
65+
import { dispatchCronDelivery } from "./delivery-dispatch.js";
66+
import type { DeliveryTargetResolution } from "./delivery-target.js";
67+
import type { RunCronAgentTurnResult } from "./run.js";
68+
import {
69+
expectsSubagentFollowup,
70+
isLikelyInterimCronMessage,
71+
readDescendantSubagentFallbackReply,
72+
waitForDescendantSubagentSummary,
73+
} from "./subagent-followup.js";
74+
75+
// ---------------------------------------------------------------------------
76+
// Helpers
77+
// ---------------------------------------------------------------------------
78+
79+
function makeResolvedDelivery(): Extract<DeliveryTargetResolution, { ok: true }> {
80+
return {
81+
ok: true,
82+
channel: "telegram",
83+
to: "123456",
84+
accountId: undefined,
85+
threadId: undefined,
86+
};
87+
}
88+
89+
function makeWithRunSession() {
90+
return (
91+
result: Omit<RunCronAgentTurnResult, "sessionId" | "sessionKey">,
92+
): RunCronAgentTurnResult => ({
93+
...result,
94+
sessionId: "test-session-id",
95+
sessionKey: "test-session-key",
96+
});
97+
}
98+
99+
function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean }) {
100+
const resolvedDelivery = makeResolvedDelivery();
101+
return {
102+
cfg: {} as never,
103+
cfgWithAgentDefaults: {} as never,
104+
deps: {} as never,
105+
job: {
106+
id: "test-job",
107+
name: "Test Job",
108+
deleteAfterRun: false,
109+
payload: { kind: "agentTurn", message: "hello" },
110+
} as never,
111+
agentId: "main",
112+
agentSessionKey: "agent:main",
113+
runSessionId: "run-123",
114+
runStartedAt: Date.now(),
115+
runEndedAt: Date.now(),
116+
timeoutMs: 30_000,
117+
resolvedDelivery,
118+
deliveryRequested: overrides.deliveryRequested ?? true,
119+
skipHeartbeatDelivery: false,
120+
skipMessagingToolDelivery: false,
121+
deliveryBestEffort: false,
122+
deliveryPayloadHasStructuredContent: false,
123+
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
124+
synthesizedText: overrides.synthesizedText ?? "on it",
125+
summary: overrides.synthesizedText ?? "on it",
126+
outputText: overrides.synthesizedText ?? "on it",
127+
telemetry: undefined,
128+
abortSignal: undefined,
129+
isAborted: () => false,
130+
abortReason: () => "aborted",
131+
withRunSession: makeWithRunSession(),
132+
};
133+
}
134+
135+
// ---------------------------------------------------------------------------
136+
// Tests
137+
// ---------------------------------------------------------------------------
138+
139+
describe("dispatchCronDelivery — double-announce guard", () => {
140+
beforeEach(() => {
141+
vi.clearAllMocks();
142+
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
143+
vi.mocked(expectsSubagentFollowup).mockReturnValue(false);
144+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
145+
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
146+
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
147+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
148+
});
149+
150+
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
151+
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
152+
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
153+
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
154+
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
155+
156+
const params = makeBaseParams({ synthesizedText: "on it" });
157+
const state = await dispatchCronDelivery(params);
158+
159+
// deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent
160+
expect(state.deliveryAttempted).toBe(true);
161+
162+
// Verify timer guard agrees: shouldEnqueueCronMainSummary returns false
163+
expect(
164+
shouldEnqueueCronMainSummary({
165+
summaryText: "on it",
166+
deliveryRequested: true,
167+
delivered: state.delivered,
168+
deliveryAttempted: state.deliveryAttempted,
169+
suppressMainSummary: false,
170+
isCronSystemEvent: () => true,
171+
}),
172+
).toBe(false);
173+
174+
// No announce should have been attempted (subagents still running)
175+
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
176+
});
177+
178+
it("early return (stale interim suppression) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
179+
// First countActiveDescendantRuns call returns >0 (had descendants), second returns 0
180+
vi.mocked(countActiveDescendantRuns)
181+
.mockReturnValueOnce(2) // initial check → hadDescendants=true, enters wait block
182+
.mockReturnValueOnce(0); // second check after wait → activeSubagentRuns=0
183+
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
184+
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
185+
// synthesizedText matches initialSynthesizedText & isLikelyInterimCronMessage → stale interim
186+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
187+
188+
const params = makeBaseParams({ synthesizedText: "on it, pulling everything together" });
189+
const state = await dispatchCronDelivery(params);
190+
191+
// deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent
192+
expect(state.deliveryAttempted).toBe(true);
193+
194+
// Verify timer guard agrees
195+
expect(
196+
shouldEnqueueCronMainSummary({
197+
summaryText: "on it, pulling everything together",
198+
deliveryRequested: true,
199+
delivered: state.delivered,
200+
deliveryAttempted: state.deliveryAttempted,
201+
suppressMainSummary: false,
202+
isCronSystemEvent: () => true,
203+
}),
204+
).toBe(false);
205+
206+
// No announce or direct delivery should have been sent (stale interim suppressed)
207+
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
208+
});
209+
210+
it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => {
211+
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
212+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
213+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
214+
215+
const params = makeBaseParams({ synthesizedText: "Morning briefing complete." });
216+
const state = await dispatchCronDelivery(params);
217+
218+
expect(state.deliveryAttempted).toBe(true);
219+
expect(state.delivered).toBe(true);
220+
// Announce called exactly once
221+
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
222+
223+
// Timer should not fire enqueueSystemEvent (delivered=true)
224+
expect(
225+
shouldEnqueueCronMainSummary({
226+
summaryText: "Morning briefing complete.",
227+
deliveryRequested: true,
228+
delivered: state.delivered,
229+
deliveryAttempted: state.deliveryAttempted,
230+
suppressMainSummary: false,
231+
isCronSystemEvent: () => true,
232+
}),
233+
).toBe(false);
234+
});
235+
236+
it("announce failure falls back to direct delivery exactly once (no double-deliver)", async () => {
237+
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
238+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
239+
// Announce fails: runSubagentAnnounceFlow returns false
240+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
241+
242+
const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js");
243+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
244+
245+
const params = makeBaseParams({ synthesizedText: "Briefing ready." });
246+
const state = await dispatchCronDelivery(params);
247+
248+
// Delivery was attempted; direct fallback picked up the slack
249+
expect(state.deliveryAttempted).toBe(true);
250+
expect(state.delivered).toBe(true);
251+
252+
// Announce was tried exactly once
253+
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
254+
255+
// Direct fallback fired exactly once (not zero, not twice)
256+
// This ensures one delivery total reaches the user, not two
257+
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
258+
});
259+
260+
it("no delivery requested means deliveryAttempted stays false and runSubagentAnnounceFlow not called", async () => {
261+
const params = makeBaseParams({
262+
synthesizedText: "Task done.",
263+
deliveryRequested: false,
264+
});
265+
const state = await dispatchCronDelivery(params);
266+
267+
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
268+
// deliveryAttempted starts false (skipMessagingToolDelivery=false) and nothing runs
269+
expect(state.deliveryAttempted).toBe(false);
270+
});
271+
});

src/cron/isolated-agent/delivery-dispatch.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,16 @@ export async function dispatchCronDelivery(
318318
}
319319
if (activeSubagentRuns > 0) {
320320
// Parent orchestration is still in progress; avoid announcing a partial
321-
// update to the main requester.
322-
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
321+
// update to the main requester. Mark deliveryAttempted so the timer does
322+
// not fire a redundant enqueueSystemEvent fallback (double-announce bug).
323+
deliveryAttempted = true;
324+
return params.withRunSession({
325+
status: "ok",
326+
summary,
327+
outputText,
328+
deliveryAttempted,
329+
...params.telemetry,
330+
});
323331
}
324332
if (
325333
hadDescendants &&
@@ -329,8 +337,16 @@ export async function dispatchCronDelivery(
329337
) {
330338
// Descendants existed but no post-orchestration synthesis arrived AND
331339
// no descendant fallback reply was available. Suppress stale parent
332-
// text like "on it, pulling everything together".
333-
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
340+
// text like "on it, pulling everything together". Mark deliveryAttempted
341+
// so the timer does not fire a redundant enqueueSystemEvent fallback.
342+
deliveryAttempted = true;
343+
return params.withRunSession({
344+
status: "ok",
345+
summary,
346+
outputText,
347+
deliveryAttempted,
348+
...params.telemetry,
349+
});
334350
}
335351
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
336352
return params.withRunSession({

0 commit comments

Comments
 (0)