Skip to content

Commit e1c8094

Browse files
fix: schedule nextWakeAtMs for isolated sessionTarget cron jobs (#19541)
* fix(cron): repair isolated next wake scheduling * cron: harden isolated next-wake timestamp guards --------- Co-authored-by: Tak Hoffman <[email protected]>
1 parent 139271a commit e1c8094

File tree

4 files changed

+88
-16
lines changed

4 files changed

+88
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,7 @@ Docs: https://docs.openclaw.ai
10351035
- Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore.
10361036
- Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060)
10371037
- Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas.
1038+
- Cron/Isolation: treat non-finite `nextRunAtMs` as missing and repair isolated `every` anchor fallback so legacy jobs without valid timestamps self-heal and scheduler wake timing remains valid. (#19469) Thanks @guirguispierre.
10381039
- Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204.
10391040
- Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204.
10401041
- Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204.

src/cron/service.issue-regressions.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,55 @@ describe("Cron issue regressions", () => {
241241
cron.stop();
242242
});
243243

244+
it("repairs isolated every jobs missing createdAtMs and sets nextWakeAtMs", async () => {
245+
const store = await makeStorePath();
246+
await fs.writeFile(
247+
store.storePath,
248+
JSON.stringify({
249+
version: 1,
250+
jobs: [
251+
{
252+
id: "legacy-isolated",
253+
agentId: "feature-dev_planner",
254+
sessionKey: "agent:main:main",
255+
name: "legacy isolated",
256+
enabled: true,
257+
schedule: { kind: "every", everyMs: 300_000 },
258+
sessionTarget: "isolated",
259+
wakeMode: "now",
260+
payload: { kind: "agentTurn", message: "poll workflow queue" },
261+
state: {},
262+
},
263+
],
264+
}),
265+
);
266+
267+
const cron = new CronService({
268+
cronEnabled: true,
269+
storePath: store.storePath,
270+
log: noopLogger,
271+
enqueueSystemEvent: vi.fn(),
272+
requestHeartbeatNow: vi.fn(),
273+
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }),
274+
});
275+
await cron.start();
276+
277+
const status = await cron.status();
278+
const jobs = await cron.list({ includeDisabled: true });
279+
const isolated = jobs.find((job) => job.id === "legacy-isolated");
280+
expect(Number.isFinite(isolated?.state.nextRunAtMs)).toBe(true);
281+
expect(Number.isFinite(status.nextWakeAtMs)).toBe(true);
282+
283+
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
284+
jobs: Array<{ id: string; state?: { nextRunAtMs?: number | null } }>;
285+
};
286+
const persistedIsolated = persisted.jobs.find((job) => job.id === "legacy-isolated");
287+
expect(typeof persistedIsolated?.state?.nextRunAtMs).toBe("number");
288+
expect(Number.isFinite(persistedIsolated?.state?.nextRunAtMs)).toBe(true);
289+
290+
cron.stop();
291+
});
292+
244293
it("repairs missing nextRunAtMs on non-schedule updates without touching other jobs", async () => {
245294
const store = await makeStorePath();
246295
const cron = await startCronForStore({ storePath: store.storePath });

src/cron/service/jobs.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,22 @@ function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) {
6363
return undefined;
6464
}
6565

66+
function isFiniteTimestamp(value: unknown): value is number {
67+
return typeof value === "number" && Number.isFinite(value);
68+
}
69+
6670
function resolveEveryAnchorMs(params: {
6771
schedule: { everyMs: number; anchorMs?: number };
6872
fallbackAnchorMs: number;
6973
}) {
7074
const raw = params.schedule.anchorMs;
71-
if (typeof raw === "number" && Number.isFinite(raw)) {
75+
if (isFiniteTimestamp(raw)) {
7276
return Math.max(0, Math.floor(raw));
7377
}
74-
return Math.max(0, Math.floor(params.fallbackAnchorMs));
78+
if (isFiniteTimestamp(params.fallbackAnchorMs)) {
79+
return Math.max(0, Math.floor(params.fallbackAnchorMs));
80+
}
81+
return 0;
7582
}
7683

7784
export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "payload">) {
@@ -144,11 +151,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
144151
return nextFromLastRun;
145152
}
146153
}
154+
const fallbackAnchorMs = isFiniteTimestamp(job.createdAtMs) ? job.createdAtMs : nowMs;
147155
const anchorMs = resolveEveryAnchorMs({
148156
schedule: job.schedule,
149-
fallbackAnchorMs: job.createdAtMs,
157+
fallbackAnchorMs,
150158
});
151-
return computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs);
159+
const next = computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs);
160+
return isFiniteTimestamp(next) ? next : undefined;
152161
}
153162
if (job.schedule.kind === "at") {
154163
// One-shot jobs stay due until they successfully finish.
@@ -167,14 +176,14 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
167176
: typeof schedule.at === "string"
168177
? parseAbsoluteTimeMs(schedule.at)
169178
: null;
170-
return atMs !== null ? atMs : undefined;
179+
return atMs !== null && Number.isFinite(atMs) ? atMs : undefined;
171180
}
172181
const next = computeStaggeredCronNextRunAtMs(job, nowMs);
173182
if (next === undefined && job.schedule.kind === "cron") {
174183
const nextSecondMs = Math.floor(nowMs / 1000) * 1000 + 1000;
175184
return computeStaggeredCronNextRunAtMs(job, nextSecondMs);
176185
}
177-
return next;
186+
return isFiniteTimestamp(next) ? next : undefined;
178187
}
179188

180189
/** Maximum consecutive schedule errors before auto-disabling a job. */
@@ -233,6 +242,11 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob;
233242
return { changed, skip: true };
234243
}
235244

245+
if (!isFiniteTimestamp(job.state.nextRunAtMs) && job.state.nextRunAtMs !== undefined) {
246+
job.state.nextRunAtMs = undefined;
247+
changed = true;
248+
}
249+
236250
const runningAt = job.state.runningAtMs;
237251
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
238252
state.deps.log.warn(
@@ -298,7 +312,7 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
298312
// Preserving a still-future nextRunAtMs avoids accidentally advancing
299313
// a job that hasn't fired yet (e.g. during restart recovery).
300314
const nextRun = job.state.nextRunAtMs;
301-
const isDueOrMissing = nextRun === undefined || now >= nextRun;
315+
const isDueOrMissing = !isFiniteTimestamp(nextRun) || now >= nextRun;
302316
if (isDueOrMissing) {
303317
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
304318
changed = true;
@@ -321,7 +335,7 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea
321335
// Only compute missing nextRunAtMs, do NOT recompute existing ones.
322336
// If a job was past-due but not found by findDueJobs, recomputing would
323337
// cause it to be silently skipped.
324-
if (job.state.nextRunAtMs === undefined) {
338+
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
325339
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
326340
changed = true;
327341
}
@@ -332,14 +346,18 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea
332346

333347
export function nextWakeAtMs(state: CronServiceState) {
334348
const jobs = state.store?.jobs ?? [];
335-
const enabled = jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number");
349+
const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs));
336350
if (enabled.length === 0) {
337351
return undefined;
338352
}
339-
return enabled.reduce(
340-
(min, j) => Math.min(min, j.state.nextRunAtMs as number),
341-
enabled[0].state.nextRunAtMs as number,
342-
);
353+
const first = enabled[0]?.state.nextRunAtMs;
354+
if (!isFiniteTimestamp(first)) {
355+
return undefined;
356+
}
357+
return enabled.reduce((min, j) => {
358+
const next = j.state.nextRunAtMs;
359+
return isFiniteTimestamp(next) ? Math.min(min, next) : min;
360+
}, first);
343361
}
344362

345363
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {

src/cron/service/timer.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,12 @@ export function armTimer(state: CronServiceState) {
250250
const jobCount = state.store?.jobs.length ?? 0;
251251
const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0;
252252
const withNextRun =
253-
state.store?.jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number")
254-
.length ?? 0;
253+
state.store?.jobs.filter(
254+
(j) =>
255+
j.enabled &&
256+
typeof j.state.nextRunAtMs === "number" &&
257+
Number.isFinite(j.state.nextRunAtMs),
258+
).length ?? 0;
255259
state.deps.log.debug(
256260
{ jobCount, enabledCount, withNextRun },
257261
"cron: armTimer skipped - no jobs with nextRunAtMs",
@@ -476,7 +480,7 @@ function isRunnableJob(params: {
476480
return false;
477481
}
478482
const next = job.state.nextRunAtMs;
479-
return typeof next === "number" && nowMs >= next;
483+
return typeof next === "number" && Number.isFinite(next) && nowMs >= next;
480484
}
481485

482486
function collectRunnableJobs(

0 commit comments

Comments
 (0)