Skip to content

Commit a1628d8

Browse files
committed
refactor: unify outbound session context wiring
1 parent 8483e01 commit a1628d8

File tree

14 files changed

+344
-36
lines changed

14 files changed

+344
-36
lines changed

src/auto-reply/reply/route-reply.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js";
1111
import { resolveEffectiveMessagesConfig } from "../../agents/identity.js";
1212
import { normalizeChannelId } from "../../channels/plugins/index.js";
1313
import type { OpenClawConfig } from "../../config/config.js";
14+
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
1415
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
1516
import type { OriginatingChannelType } from "../templating.js";
1617
import type { ReplyPayload } from "../types.js";
@@ -122,6 +123,11 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
122123
// Provider docking: this is an execution boundary (we're about to send).
123124
// Keep the module cheap to import by loading outbound plumbing lazily.
124125
const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js");
126+
const outboundSession = buildOutboundSessionContext({
127+
cfg,
128+
agentId: resolvedAgentId,
129+
sessionKey: params.sessionKey,
130+
});
125131
const results = await deliverOutboundPayloads({
126132
cfg,
127133
channel: channelId,
@@ -130,7 +136,7 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
130136
payloads: [normalized],
131137
replyToId: resolvedReplyToId ?? null,
132138
threadId: resolvedThreadId,
133-
agentId: resolvedAgentId,
139+
session: outboundSession,
134140
abortSignal,
135141
mirror:
136142
params.mirror !== false && params.sessionKey

src/commands/agent/delivery.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
21
import { AGENT_LANE_NESTED } from "../../agents/lanes.js";
32
import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js";
43
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
@@ -17,6 +16,7 @@ import {
1716
normalizeOutboundPayloads,
1817
normalizeOutboundPayloadsForJson,
1918
} from "../../infra/outbound/payloads.js";
19+
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
2020
import type { RuntimeEnv } from "../../runtime.js";
2121
import { isInternalMessageChannel } from "../../utils/message-channel.js";
2222
import type { AgentCommandOpts } from "./types.js";
@@ -212,25 +212,24 @@ export async function deliverAgentCommandResult(params: {
212212
}
213213
if (deliver && deliveryChannel && !isInternalMessageChannel(deliveryChannel)) {
214214
if (deliveryTarget) {
215-
const deliveryAgentId =
216-
opts.agentId ??
217-
(opts.sessionKey
218-
? resolveSessionAgentId({ sessionKey: opts.sessionKey, config: cfg })
219-
: undefined);
215+
const deliverySession = buildOutboundSessionContext({
216+
cfg,
217+
agentId: opts.agentId,
218+
sessionKey: opts.sessionKey,
219+
});
220220
await deliverOutboundPayloads({
221221
cfg,
222222
channel: deliveryChannel,
223223
to: deliveryTarget,
224224
accountId: resolvedAccountId,
225225
payloads: deliveryPayloads,
226-
agentId: deliveryAgentId,
226+
session: deliverySession,
227227
replyToId: resolvedReplyToId ?? null,
228228
threadId: resolvedThreadTarget ?? null,
229229
bestEffort: bestEffortDeliver,
230230
onError: (err) => logDeliveryError(err),
231231
onPayload: logPayload,
232232
deps: createOutboundSendDeps(deps),
233-
sessionKey: opts.sessionKey,
234233
});
235234
}
236235
}

src/cron/isolated-agent/delivery-dispatch.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { resolveAgentMainSessionKey } from "../../config/sessions.js";
88
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
99
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
1010
import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js";
11+
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
1112
import { logWarn } from "../../logger.js";
1213
import type { CronJob, CronRunTelemetry } from "../types.js";
1314
import type { DeliveryTargetResolution } from "./delivery-target.js";
@@ -170,19 +171,23 @@ export async function dispatchCronDelivery(
170171
});
171172
}
172173
deliveryAttempted = true;
174+
const deliverySession = buildOutboundSessionContext({
175+
cfg: params.cfgWithAgentDefaults,
176+
agentId: params.agentId,
177+
sessionKey: params.agentSessionKey,
178+
});
173179
const deliveryResults = await deliverOutboundPayloads({
174180
cfg: params.cfgWithAgentDefaults,
175181
channel: delivery.channel,
176182
to: delivery.to,
177183
accountId: delivery.accountId,
178184
threadId: delivery.threadId,
179185
payloads: payloadsForDelivery,
180-
agentId: params.agentId,
186+
session: deliverySession,
181187
identity,
182188
bestEffort: params.deliveryBestEffort,
183189
deps: createOutboundSendDeps(params.deps),
184190
abortSignal: params.abortSignal,
185-
sessionKey: params.agentSessionKey,
186191
});
187192
delivered = deliveryResults.length > 0;
188193
return null;

src/gateway/server-methods/send.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
resolveOutboundSessionRoute,
1111
} from "../../infra/outbound/outbound-session.js";
1212
import { normalizeReplyPayloadsForDelivery } from "../../infra/outbound/payloads.js";
13+
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
1314
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
1415
import { normalizePollInput } from "../../polls.js";
1516
import {
@@ -237,13 +238,18 @@ export const sendHandlers: GatewayRequestHandlers = {
237238
route: derivedRoute,
238239
});
239240
}
241+
const outboundSession = buildOutboundSessionContext({
242+
cfg,
243+
agentId: effectiveAgentId,
244+
sessionKey: providedSessionKey ?? derivedRoute?.sessionKey,
245+
});
240246
const results = await deliverOutboundPayloads({
241247
cfg,
242248
channel: outboundChannel,
243249
to: resolved.to,
244250
accountId,
245251
payloads: [{ text: message, mediaUrl, mediaUrls }],
246-
agentId: effectiveAgentId,
252+
session: outboundSession,
247253
gifPlayback: request.gifPlayback,
248254
threadId: threadId ?? null,
249255
deps: outboundDeps,

src/gateway/server-node-events.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { randomUUID } from "node:crypto";
2-
import { resolveSessionAgentId } from "../agents/agent-scope.js";
32
import { normalizeChannelId } from "../channels/plugins/index.js";
43
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
54
import { agentCommand } from "../commands/agent.js";
65
import { loadConfig } from "../config/config.js";
76
import { updateSessionStore } from "../config/sessions.js";
87
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
98
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
9+
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
1010
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
1111
import { registerApnsToken } from "../infra/push-apns.js";
1212
import { enqueueSystemEvent } from "../infra/system-events.js";
@@ -232,16 +232,18 @@ async function sendReceiptAck(params: {
232232
if (!resolved.ok) {
233233
throw new Error(String(resolved.error));
234234
}
235-
const agentId = resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg });
235+
const session = buildOutboundSessionContext({
236+
cfg: params.cfg,
237+
sessionKey: params.sessionKey,
238+
});
236239
await deliverOutboundPayloads({
237240
cfg: params.cfg,
238241
channel: params.channel,
239242
to: resolved.to,
240243
payloads: [{ text: params.text }],
241-
agentId,
244+
session,
242245
bestEffort: true,
243246
deps: createOutboundSendDeps(params.deps),
244-
sessionKey: params.sessionKey,
245247
});
246248
}
247249

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
const mocks = vi.hoisted(() => ({
4+
resolveSessionAgentId: vi.fn(() => "agent-from-key"),
5+
consumeRestartSentinel: vi.fn(async () => ({
6+
payload: {
7+
sessionKey: "agent:main:main",
8+
deliveryContext: {
9+
channel: "whatsapp",
10+
to: "+15550002",
11+
accountId: "acct-2",
12+
},
13+
},
14+
})),
15+
formatRestartSentinelMessage: vi.fn(() => "restart message"),
16+
summarizeRestartSentinel: vi.fn(() => "restart summary"),
17+
resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"),
18+
parseSessionThreadInfo: vi.fn(() => ({ baseSessionKey: null, threadId: undefined })),
19+
loadSessionEntry: vi.fn(() => ({ cfg: {}, entry: {} })),
20+
resolveAnnounceTargetFromKey: vi.fn(() => null),
21+
deliveryContextFromSession: vi.fn(() => undefined),
22+
mergeDeliveryContext: vi.fn((a?: Record<string, unknown>, b?: Record<string, unknown>) => ({
23+
...b,
24+
...a,
25+
})),
26+
normalizeChannelId: vi.fn((channel: string) => channel),
27+
resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })),
28+
deliverOutboundPayloads: vi.fn(async () => []),
29+
enqueueSystemEvent: vi.fn(),
30+
}));
31+
32+
vi.mock("../agents/agent-scope.js", () => ({
33+
resolveSessionAgentId: mocks.resolveSessionAgentId,
34+
}));
35+
36+
vi.mock("../infra/restart-sentinel.js", () => ({
37+
consumeRestartSentinel: mocks.consumeRestartSentinel,
38+
formatRestartSentinelMessage: mocks.formatRestartSentinelMessage,
39+
summarizeRestartSentinel: mocks.summarizeRestartSentinel,
40+
}));
41+
42+
vi.mock("../config/sessions.js", () => ({
43+
resolveMainSessionKeyFromConfig: mocks.resolveMainSessionKeyFromConfig,
44+
}));
45+
46+
vi.mock("../config/sessions/delivery-info.js", () => ({
47+
parseSessionThreadInfo: mocks.parseSessionThreadInfo,
48+
}));
49+
50+
vi.mock("./session-utils.js", () => ({
51+
loadSessionEntry: mocks.loadSessionEntry,
52+
}));
53+
54+
vi.mock("../agents/tools/sessions-send-helpers.js", () => ({
55+
resolveAnnounceTargetFromKey: mocks.resolveAnnounceTargetFromKey,
56+
}));
57+
58+
vi.mock("../utils/delivery-context.js", () => ({
59+
deliveryContextFromSession: mocks.deliveryContextFromSession,
60+
mergeDeliveryContext: mocks.mergeDeliveryContext,
61+
}));
62+
63+
vi.mock("../channels/plugins/index.js", () => ({
64+
normalizeChannelId: mocks.normalizeChannelId,
65+
}));
66+
67+
vi.mock("../infra/outbound/targets.js", () => ({
68+
resolveOutboundTarget: mocks.resolveOutboundTarget,
69+
}));
70+
71+
vi.mock("../infra/outbound/deliver.js", () => ({
72+
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
73+
}));
74+
75+
vi.mock("../infra/system-events.js", () => ({
76+
enqueueSystemEvent: mocks.enqueueSystemEvent,
77+
}));
78+
79+
const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js");
80+
81+
describe("scheduleRestartSentinelWake", () => {
82+
it("forwards session context to outbound delivery", async () => {
83+
await scheduleRestartSentinelWake({ deps: {} as never });
84+
85+
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
86+
expect.objectContaining({
87+
channel: "whatsapp",
88+
to: "+15550002",
89+
session: { key: "agent:main:main", agentId: "agent-from-key" },
90+
}),
91+
);
92+
expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled();
93+
});
94+
});

src/gateway/server-restart-sentinel.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { resolveSessionAgentId } from "../agents/agent-scope.js";
21
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
32
import { normalizeChannelId } from "../channels/plugins/index.js";
43
import type { CliDeps } from "../cli/deps.js";
54
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
65
import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js";
76
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
7+
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
88
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
99
import {
1010
consumeRestartSentinel,
@@ -83,6 +83,10 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
8383
const isSlack = channel === "slack";
8484
const replyToId = isSlack && threadId != null && threadId !== "" ? String(threadId) : undefined;
8585
const resolvedThreadId = isSlack ? undefined : threadId;
86+
const outboundSession = buildOutboundSessionContext({
87+
cfg,
88+
sessionKey,
89+
});
8690

8791
try {
8892
await deliverOutboundPayloads({
@@ -93,9 +97,8 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
9397
replyToId,
9498
threadId: resolvedThreadId,
9599
payloads: [{ text: message }],
96-
agentId: resolveSessionAgentId({ sessionKey, config: cfg }),
100+
session: outboundSession,
97101
bestEffort: true,
98-
sessionKey,
99102
});
100103
} catch (err) {
101104
enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey });

src/infra/heartbeat-runner.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import {
6060
} from "./heartbeat-wake.js";
6161
import type { OutboundSendDeps } from "./outbound/deliver.js";
6262
import { deliverOutboundPayloads } from "./outbound/deliver.js";
63+
import { buildOutboundSessionContext } from "./outbound/session-context.js";
6364
import {
6465
resolveHeartbeatDeliveryTarget,
6566
resolveHeartbeatSenderContext,
@@ -696,6 +697,11 @@ export async function runHeartbeatOnce(opts: {
696697
}
697698

698699
const heartbeatOkText = responsePrefix ? `${responsePrefix} ${HEARTBEAT_TOKEN}` : HEARTBEAT_TOKEN;
700+
const outboundSession = buildOutboundSessionContext({
701+
cfg,
702+
agentId,
703+
sessionKey,
704+
});
699705
const canAttemptHeartbeatOk = Boolean(
700706
visibility.showOk && delivery.channel !== "none" && delivery.to,
701707
);
@@ -721,9 +727,8 @@ export async function runHeartbeatOnce(opts: {
721727
accountId: delivery.accountId,
722728
threadId: delivery.threadId,
723729
payloads: [{ text: heartbeatOkText }],
724-
agentId,
730+
session: outboundSession,
725731
deps: opts.deps,
726-
sessionKey,
727732
});
728733
return true;
729734
};
@@ -915,7 +920,7 @@ export async function runHeartbeatOnce(opts: {
915920
channel: delivery.channel,
916921
to: delivery.to,
917922
accountId: deliveryAccountId,
918-
agentId,
923+
session: outboundSession,
919924
threadId: delivery.threadId,
920925
payloads: [
921926
...reasoningPayloads,
@@ -929,7 +934,6 @@ export async function runHeartbeatOnce(opts: {
929934
]),
930935
],
931936
deps: opts.deps,
932-
sessionKey,
933937
});
934938

935939
// Record last delivered heartbeat payload for dedupe.

0 commit comments

Comments
 (0)