Skip to content

Commit e66c418

Browse files
committed
refactor(cron): normalize legacy delivery at ingress
1 parent 9b99787 commit e66c418

6 files changed

Lines changed: 132 additions & 122 deletions

File tree

src/cron/legacy-delivery.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,102 @@ export function buildDeliveryFromLegacyPayload(
4242
return next;
4343
}
4444

45+
export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
46+
const deliver = payload.deliver;
47+
const channelRaw =
48+
typeof payload.channel === "string" && payload.channel.trim()
49+
? payload.channel.trim().toLowerCase()
50+
: typeof payload.provider === "string" && payload.provider.trim()
51+
? payload.provider.trim().toLowerCase()
52+
: "";
53+
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
54+
const next: Record<string, unknown> = {};
55+
let hasPatch = false;
56+
57+
if (deliver === false) {
58+
next.mode = "none";
59+
hasPatch = true;
60+
} else if (
61+
deliver === true ||
62+
channelRaw ||
63+
toRaw ||
64+
typeof payload.bestEffortDeliver === "boolean"
65+
) {
66+
next.mode = "announce";
67+
hasPatch = true;
68+
}
69+
if (channelRaw) {
70+
next.channel = channelRaw;
71+
hasPatch = true;
72+
}
73+
if (toRaw) {
74+
next.to = toRaw;
75+
hasPatch = true;
76+
}
77+
if (typeof payload.bestEffortDeliver === "boolean") {
78+
next.bestEffort = payload.bestEffortDeliver;
79+
hasPatch = true;
80+
}
81+
82+
return hasPatch ? next : null;
83+
}
84+
85+
export function mergeLegacyDeliveryInto(
86+
delivery: Record<string, unknown>,
87+
payload: Record<string, unknown>,
88+
) {
89+
const patch = buildDeliveryPatchFromLegacyPayload(payload);
90+
if (!patch) {
91+
return { delivery, mutated: false };
92+
}
93+
94+
const next = { ...delivery };
95+
let mutated = false;
96+
97+
if ("mode" in patch && patch.mode !== next.mode) {
98+
next.mode = patch.mode;
99+
mutated = true;
100+
}
101+
if ("channel" in patch && patch.channel !== next.channel) {
102+
next.channel = patch.channel;
103+
mutated = true;
104+
}
105+
if ("to" in patch && patch.to !== next.to) {
106+
next.to = patch.to;
107+
mutated = true;
108+
}
109+
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
110+
next.bestEffort = patch.bestEffort;
111+
mutated = true;
112+
}
113+
114+
return { delivery: next, mutated };
115+
}
116+
117+
export function normalizeLegacyDeliveryInput(params: {
118+
delivery?: Record<string, unknown> | null;
119+
payload?: Record<string, unknown> | null;
120+
}) {
121+
if (!params.payload || !hasLegacyDeliveryHints(params.payload)) {
122+
return {
123+
delivery: params.delivery ?? undefined,
124+
mutated: false,
125+
};
126+
}
127+
128+
const nextDelivery = params.delivery
129+
? mergeLegacyDeliveryInto(params.delivery, params.payload)
130+
: {
131+
delivery: buildDeliveryFromLegacyPayload(params.payload),
132+
mutated: true,
133+
};
134+
stripLegacyDeliveryFields(params.payload);
135+
return {
136+
delivery: nextDelivery.delivery,
137+
mutated: true,
138+
};
139+
}
140+
45141
export function stripLegacyDeliveryFields(payload: Record<string, unknown>) {
46142
if ("deliver" in payload) {
47143
delete payload.deliver;

src/cron/normalize.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { sanitizeAgentId } from "../routing/session-key.js";
22
import { isRecord } from "../utils.js";
3-
import {
4-
buildDeliveryFromLegacyPayload,
5-
hasLegacyDeliveryHints,
6-
stripLegacyDeliveryFields,
7-
} from "./legacy-delivery.js";
3+
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
84
import { parseAbsoluteTimeMs } from "./parse.js";
95
import { migrateLegacyCronPayload } from "./payload-migration.js";
106
import { inferLegacyName } from "./service/normalize.js";
@@ -469,14 +465,20 @@ export function normalizeCronJobInput(
469465
const isIsolatedAgentTurn =
470466
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
471467
const hasDelivery = "delivery" in next && next.delivery !== undefined;
472-
const hasLegacyDelivery = payload ? hasLegacyDeliveryHints(payload) : false;
473-
if (!hasDelivery && isIsolatedAgentTurn && payloadKind === "agentTurn") {
474-
if (payload && hasLegacyDelivery) {
475-
next.delivery = buildDeliveryFromLegacyPayload(payload);
476-
stripLegacyDeliveryFields(payload);
477-
} else {
478-
next.delivery = { mode: "announce" };
479-
}
468+
const normalizedLegacy = normalizeLegacyDeliveryInput({
469+
delivery: isRecord(next.delivery) ? next.delivery : null,
470+
payload,
471+
});
472+
if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
473+
next.delivery = normalizedLegacy.delivery;
474+
}
475+
if (
476+
!hasDelivery &&
477+
!normalizedLegacy.delivery &&
478+
isIsolatedAgentTurn &&
479+
payloadKind === "agentTurn"
480+
) {
481+
next.delivery = { mode: "announce" };
480482
}
481483
}
482484

src/cron/service.jobs.test.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -589,19 +589,6 @@ describe("createJob delivery defaults", () => {
589589
expect(job.delivery).toEqual({ mode: "none" });
590590
});
591591

592-
it("preserves legacy payload deliver=false when explicit delivery is omitted", () => {
593-
const state = createMockState(now);
594-
const job = createJob(state, {
595-
name: "isolated-legacy-no-deliver",
596-
enabled: true,
597-
schedule: { kind: "every", everyMs: 60_000 },
598-
sessionTarget: "isolated",
599-
wakeMode: "now",
600-
payload: { kind: "agentTurn", message: "hello", deliver: false } as never,
601-
});
602-
expect(job.delivery).toEqual({ mode: "none" });
603-
});
604-
605592
it("does not set delivery for main systemEvent jobs without explicit delivery", () => {
606593
const state = createMockState(now, { defaultAgentId: "main" });
607594
const job = createJob(state, {

src/cron/service/initial-delivery.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
1-
import { buildDeliveryFromLegacyPayload, hasLegacyDeliveryHints } from "../legacy-delivery.js";
21
import type { CronDelivery, CronJobCreate } from "../types.js";
32

43
export function resolveInitialCronDelivery(input: CronJobCreate): CronDelivery | undefined {
54
if (input.delivery) {
65
return input.delivery;
76
}
8-
const payloadRecord =
9-
input.payload && typeof input.payload === "object"
10-
? (input.payload as Record<string, unknown>)
11-
: undefined;
12-
if (payloadRecord && hasLegacyDeliveryHints(payloadRecord)) {
13-
return buildDeliveryFromLegacyPayload(payloadRecord) as CronDelivery;
14-
}
157
if (input.sessionTarget === "isolated" && input.payload.kind === "agentTurn") {
168
return { mode: "announce" };
179
}

src/cron/service/ops.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { normalizeCronJobCreate } from "../normalize.js";
12
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
23
import {
34
applyJobPatch,
@@ -234,7 +235,11 @@ export async function add(state: CronServiceState, input: CronJobCreate) {
234235
return await locked(state, async () => {
235236
warnIfDisabled(state, "add");
236237
await ensureLoaded(state);
237-
const job = createJob(state, input);
238+
const normalizedInput = normalizeCronJobCreate(input);
239+
if (!normalizedInput) {
240+
throw new Error("invalid cron job input");
241+
}
242+
const job = createJob(state, normalizedInput);
238243
state.store?.jobs.push(job);
239244

240245
// Defensive: recompute all next-run times to ensure consistency

src/cron/service/store.ts

Lines changed: 15 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import fs from "node:fs";
2-
import {
3-
buildDeliveryFromLegacyPayload,
4-
hasLegacyDeliveryHints,
5-
stripLegacyDeliveryFields,
6-
} from "../legacy-delivery.js";
2+
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
73
import { parseAbsoluteTimeMs } from "../parse.js";
84
import { migrateLegacyCronPayload } from "../payload-migration.js";
95
import { coerceFiniteScheduleNumber } from "../schedule.js";
@@ -14,69 +10,6 @@ import { recomputeNextRuns } from "./jobs.js";
1410
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
1511
import type { CronServiceState } from "./state.js";
1612

17-
function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
18-
const deliver = payload.deliver;
19-
const channelRaw =
20-
typeof payload.channel === "string" ? payload.channel.trim().toLowerCase() : "";
21-
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
22-
const next: Record<string, unknown> = {};
23-
let hasPatch = false;
24-
25-
if (deliver === false) {
26-
next.mode = "none";
27-
hasPatch = true;
28-
} else if (deliver === true || toRaw) {
29-
next.mode = "announce";
30-
hasPatch = true;
31-
}
32-
if (channelRaw) {
33-
next.channel = channelRaw;
34-
hasPatch = true;
35-
}
36-
if (toRaw) {
37-
next.to = toRaw;
38-
hasPatch = true;
39-
}
40-
if (typeof payload.bestEffortDeliver === "boolean") {
41-
next.bestEffort = payload.bestEffortDeliver;
42-
hasPatch = true;
43-
}
44-
45-
return hasPatch ? next : null;
46-
}
47-
48-
function mergeLegacyDeliveryInto(
49-
delivery: Record<string, unknown>,
50-
payload: Record<string, unknown>,
51-
) {
52-
const patch = buildDeliveryPatchFromLegacyPayload(payload);
53-
if (!patch) {
54-
return { delivery, mutated: false };
55-
}
56-
57-
const next = { ...delivery };
58-
let mutated = false;
59-
60-
if ("mode" in patch && patch.mode !== next.mode) {
61-
next.mode = patch.mode;
62-
mutated = true;
63-
}
64-
if ("channel" in patch && patch.channel !== next.channel) {
65-
next.channel = patch.channel;
66-
mutated = true;
67-
}
68-
if ("to" in patch && patch.to !== next.to) {
69-
next.to = patch.to;
70-
mutated = true;
71-
}
72-
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
73-
next.bestEffort = patch.bestEffort;
74-
mutated = true;
75-
}
76-
77-
return { delivery: next, mutated };
78-
}
79-
8013
function normalizePayloadKind(payload: Record<string, unknown>) {
8114
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
8215
if (raw === "agentturn") {
@@ -512,30 +445,25 @@ export async function ensureLoaded(
512445
const isIsolatedAgentTurn =
513446
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
514447
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
515-
const hasLegacyDelivery = payloadRecord ? hasLegacyDeliveryHints(payloadRecord) : false;
448+
const normalizedLegacy = normalizeLegacyDeliveryInput({
449+
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
450+
payload: payloadRecord,
451+
});
516452

517453
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
518-
if (!hasDelivery) {
519-
raw.delivery =
520-
payloadRecord && hasLegacyDelivery
521-
? buildDeliveryFromLegacyPayload(payloadRecord)
522-
: { mode: "announce" };
454+
if (!hasDelivery && normalizedLegacy.delivery) {
455+
raw.delivery = normalizedLegacy.delivery;
523456
mutated = true;
524-
}
525-
if (payloadRecord && hasLegacyDelivery) {
526-
if (hasDelivery) {
527-
const merged = mergeLegacyDeliveryInto(
528-
delivery as Record<string, unknown>,
529-
payloadRecord,
530-
);
531-
if (merged.mutated) {
532-
raw.delivery = merged.delivery;
533-
mutated = true;
534-
}
535-
}
536-
stripLegacyDeliveryFields(payloadRecord);
457+
} else if (!hasDelivery) {
458+
raw.delivery = { mode: "announce" };
459+
mutated = true;
460+
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
461+
raw.delivery = normalizedLegacy.delivery;
537462
mutated = true;
538463
}
464+
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
465+
raw.delivery = normalizedLegacy.delivery;
466+
mutated = true;
539467
}
540468
}
541469
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };

0 commit comments

Comments
 (0)