Skip to content

Commit 3c34743

Browse files
authored
acp: harden follow-up reliability and attachments (#41464)
Merged via squash. Prepared head SHA: 7d167df Co-authored-by: mbelinky <[email protected]> Co-authored-by: mbelinky <[email protected]> Reviewed-by: @mbelinky
1 parent 0669b0d commit 3c34743

File tree

7 files changed

+230
-44
lines changed

7 files changed

+230
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
3333
- ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky.
3434
- ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky.
3535
- Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf.
36+
- ACP/follow-up hardening: make session restore and prompt completion degrade gracefully on transcript/update failures, enforce bounded tool-location traversal, and skip non-image ACPX turns the runtime cannot serialize. (#41464) Thanks @mbelinky.
3637

3738
## 2026.3.8
3839

src/acp/event-mapper.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { describe, expect, it } from "vitest";
2+
import { extractToolCallLocations } from "./event-mapper.js";
3+
4+
describe("extractToolCallLocations", () => {
5+
it("enforces the global node visit cap across nested structures", () => {
6+
const nested = Array.from({ length: 20 }, (_, outer) =>
7+
Array.from({ length: 20 }, (_, inner) =>
8+
inner === 19 ? { path: `/tmp/file-${outer}.txt` } : { note: `${outer}-${inner}` },
9+
),
10+
);
11+
12+
const locations = extractToolCallLocations(nested);
13+
14+
expect(locations).toBeDefined();
15+
expect(locations?.length).toBeLessThan(20);
16+
expect(locations).not.toContainEqual({ path: "/tmp/file-19.txt" });
17+
});
18+
});

src/acp/event-mapper.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ function collectLocationsFromTextMarkers(
186186
function collectToolLocations(
187187
value: unknown,
188188
locations: Map<string, ToolCallLocation>,
189-
state: { visited: number; depth: number },
189+
state: { visited: number },
190+
depth: number,
190191
): void {
191-
if (state.visited >= TOOL_LOCATION_MAX_NODES || state.depth > TOOL_LOCATION_MAX_DEPTH) {
192+
if (state.visited >= TOOL_LOCATION_MAX_NODES || depth > TOOL_LOCATION_MAX_DEPTH) {
192193
return;
193194
}
194195
state.visited += 1;
@@ -202,8 +203,7 @@ function collectToolLocations(
202203
}
203204
if (Array.isArray(value)) {
204205
for (const item of value) {
205-
collectToolLocations(item, locations, { visited: state.visited, depth: state.depth + 1 });
206-
state.visited += 1;
206+
collectToolLocations(item, locations, state, depth + 1);
207207
if (state.visited >= TOOL_LOCATION_MAX_NODES) {
208208
return;
209209
}
@@ -230,9 +230,11 @@ function collectToolLocations(
230230
}
231231
}
232232

233-
for (const nested of Object.values(record)) {
234-
collectToolLocations(nested, locations, { visited: state.visited, depth: state.depth + 1 });
235-
state.visited += 1;
233+
for (const [key, nested] of Object.entries(record)) {
234+
if (key === "content") {
235+
continue;
236+
}
237+
collectToolLocations(nested, locations, state, depth + 1);
236238
if (state.visited >= TOOL_LOCATION_MAX_NODES) {
237239
return;
238240
}
@@ -402,7 +404,7 @@ export function extractToolCallContent(value: unknown): ToolCallContent[] | unde
402404
export function extractToolCallLocations(...values: unknown[]): ToolCallLocation[] | undefined {
403405
const locations = new Map<string, ToolCallLocation>();
404406
for (const value of values) {
405-
collectToolLocations(value, locations, { visited: 0, depth: 0 });
407+
collectToolLocations(value, locations, { visited: 0 }, 0);
406408
}
407409
return locations.size > 0 ? [...locations.values()] : undefined;
408410
}

src/acp/translator.session-rate-limit.test.ts

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,63 @@ describe("acp session UX bridge behavior", () => {
365365

366366
sessionStore.clearAllSessionsForTest();
367367
});
368+
369+
it("falls back to an empty transcript when sessions.get fails during loadSession", async () => {
370+
const sessionStore = createInMemorySessionStore();
371+
const connection = createAcpConnection();
372+
const sessionUpdate = connection.__sessionUpdateMock;
373+
const request = vi.fn(async (method: string) => {
374+
if (method === "sessions.list") {
375+
return {
376+
ts: Date.now(),
377+
path: "/tmp/sessions.json",
378+
count: 1,
379+
defaults: {
380+
modelProvider: null,
381+
model: null,
382+
contextTokens: null,
383+
},
384+
sessions: [
385+
{
386+
key: "agent:main:recover",
387+
label: "recover",
388+
displayName: "Recover session",
389+
kind: "direct",
390+
updatedAt: 1_710_000_000_000,
391+
thinkingLevel: "adaptive",
392+
modelProvider: "openai",
393+
model: "gpt-5.4",
394+
},
395+
],
396+
};
397+
}
398+
if (method === "sessions.get") {
399+
throw new Error("sessions.get unavailable");
400+
}
401+
return { ok: true };
402+
}) as GatewayClient["request"];
403+
const agent = new AcpGatewayAgent(connection, createAcpGateway(request), {
404+
sessionStore,
405+
});
406+
407+
const result = await agent.loadSession(createLoadSessionRequest("agent:main:recover"));
408+
409+
expect(result.modes?.currentModeId).toBe("adaptive");
410+
expect(sessionUpdate).toHaveBeenCalledWith({
411+
sessionId: "agent:main:recover",
412+
update: expect.objectContaining({
413+
sessionUpdate: "available_commands_update",
414+
}),
415+
});
416+
expect(sessionUpdate).not.toHaveBeenCalledWith({
417+
sessionId: "agent:main:recover",
418+
update: expect.objectContaining({
419+
sessionUpdate: "user_message_chunk",
420+
}),
421+
});
422+
423+
sessionStore.clearAllSessionsForTest();
424+
});
368425
});
369426

370427
describe("acp setSessionMode bridge behavior", () => {
@@ -771,6 +828,61 @@ describe("acp session metadata and usage updates", () => {
771828

772829
sessionStore.clearAllSessionsForTest();
773830
});
831+
832+
it("still resolves prompts when snapshot updates fail after completion", async () => {
833+
const sessionStore = createInMemorySessionStore();
834+
const connection = createAcpConnection();
835+
const sessionUpdate = connection.__sessionUpdateMock;
836+
const request = vi.fn(async (method: string) => {
837+
if (method === "sessions.list") {
838+
return {
839+
ts: Date.now(),
840+
path: "/tmp/sessions.json",
841+
count: 1,
842+
defaults: {
843+
modelProvider: null,
844+
model: null,
845+
contextTokens: null,
846+
},
847+
sessions: [
848+
{
849+
key: "usage-session",
850+
displayName: "Usage session",
851+
kind: "direct",
852+
updatedAt: 1_710_000_123_000,
853+
thinkingLevel: "adaptive",
854+
modelProvider: "openai",
855+
model: "gpt-5.4",
856+
totalTokens: 1200,
857+
totalTokensFresh: true,
858+
contextTokens: 4000,
859+
},
860+
],
861+
};
862+
}
863+
if (method === "chat.send") {
864+
return new Promise(() => {});
865+
}
866+
return { ok: true };
867+
}) as GatewayClient["request"];
868+
const agent = new AcpGatewayAgent(connection, createAcpGateway(request), {
869+
sessionStore,
870+
});
871+
872+
await agent.loadSession(createLoadSessionRequest("usage-session"));
873+
sessionUpdate.mockClear();
874+
sessionUpdate.mockRejectedValueOnce(new Error("session update transport failed"));
875+
876+
const promptPromise = agent.prompt(createPromptRequest("usage-session", "hello"));
877+
await agent.handleGatewayEvent(createChatFinalEvent("usage-session"));
878+
879+
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
880+
const session = sessionStore.getSession("usage-session");
881+
expect(session?.activeRunId).toBeNull();
882+
expect(session?.abortController).toBeNull();
883+
884+
sessionStore.clearAllSessionsForTest();
885+
});
774886
});
775887

776888
describe("acp prompt size hardening", () => {

src/acp/translator.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,10 @@ export class AcpGatewayAgent implements Agent {
458458
this.log(`loadSession: ${session.sessionId} -> ${session.sessionKey}`);
459459
const [sessionSnapshot, transcript] = await Promise.all([
460460
this.getSessionSnapshot(session.sessionKey),
461-
this.getSessionTranscript(session.sessionKey),
461+
this.getSessionTranscript(session.sessionKey).catch((err) => {
462+
this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`);
463+
return [];
464+
}),
462465
]);
463466
await this.replaySessionTranscript(session.sessionId, transcript);
464467
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
@@ -630,7 +633,6 @@ export class AcpGatewayAgent implements Agent {
630633
if (!session) {
631634
return;
632635
}
633-
634636
this.sessionStore.cancelActiveRun(params.sessionId);
635637
try {
636638
await this.gateway.request("chat.abort", { sessionKey: session.sessionKey });
@@ -841,9 +843,13 @@ export class AcpGatewayAgent implements Agent {
841843
this.pendingPrompts.delete(sessionId);
842844
this.sessionStore.clearActiveRun(sessionId);
843845
const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey);
844-
await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, {
845-
includeControls: false,
846-
});
846+
try {
847+
await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, {
848+
includeControls: false,
849+
});
850+
} catch (err) {
851+
this.log(`session snapshot update failed for ${sessionId}: ${String(err)}`);
852+
}
847853
pending.resolve({ stopReason });
848854
}
849855

src/auto-reply/reply/dispatch-acp.test.ts

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -362,28 +362,58 @@ describe("tryDispatchAcpReply", () => {
362362
setReadyAcpResolution();
363363
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
364364
const imagePath = path.join(tempDir, "inbound.png");
365-
await fs.writeFile(imagePath, "image-bytes");
366-
managerMocks.runTurn.mockResolvedValue(undefined);
365+
try {
366+
await fs.writeFile(imagePath, "image-bytes");
367+
managerMocks.runTurn.mockResolvedValue(undefined);
368+
369+
await runDispatch({
370+
bodyForAgent: " ",
371+
ctxOverrides: {
372+
MediaPath: imagePath,
373+
MediaType: "image/png",
374+
},
375+
});
376+
377+
expect(managerMocks.runTurn).toHaveBeenCalledWith(
378+
expect.objectContaining({
379+
text: "",
380+
attachments: [
381+
{
382+
mediaType: "image/png",
383+
data: Buffer.from("image-bytes").toString("base64"),
384+
},
385+
],
386+
}),
387+
);
388+
} finally {
389+
await fs.rm(tempDir, { recursive: true, force: true });
390+
}
391+
});
367392

368-
await runDispatch({
369-
bodyForAgent: " ",
370-
ctxOverrides: {
371-
MediaPath: imagePath,
372-
MediaType: "image/png",
373-
},
374-
});
393+
it("skips ACP turns for non-image attachments when there is no text prompt", async () => {
394+
setReadyAcpResolution();
395+
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
396+
const docPath = path.join(tempDir, "inbound.pdf");
397+
const { dispatcher } = createDispatcher();
398+
const onReplyStart = vi.fn();
399+
try {
400+
await fs.writeFile(docPath, "pdf-bytes");
401+
402+
await runDispatch({
403+
bodyForAgent: " ",
404+
dispatcher,
405+
onReplyStart,
406+
ctxOverrides: {
407+
MediaPath: docPath,
408+
MediaType: "application/pdf",
409+
},
410+
});
375411

376-
expect(managerMocks.runTurn).toHaveBeenCalledWith(
377-
expect.objectContaining({
378-
text: "",
379-
attachments: [
380-
{
381-
mediaType: "image/png",
382-
data: Buffer.from("image-bytes").toString("base64"),
383-
},
384-
],
385-
}),
386-
);
412+
expect(managerMocks.runTurn).not.toHaveBeenCalled();
413+
expect(onReplyStart).not.toHaveBeenCalled();
414+
} finally {
415+
await fs.rm(tempDir, { recursive: true, force: true });
416+
}
387417
});
388418

389419
it("surfaces ACP policy errors as final error replies", async () => {

src/auto-reply/reply/dispatch-acp.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { logVerbose } from "../../globals.js";
1616
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
1717
import { generateSecureUuid } from "../../infra/secure-random.js";
1818
import { prefixSystemMessage } from "../../infra/system-message.js";
19+
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
1920
import {
2021
normalizeAttachmentPath,
2122
normalizeAttachments,
@@ -69,6 +70,10 @@ async function resolveAcpAttachments(ctx: FinalizedMsgContext): Promise<AcpTurnA
6970
const mediaAttachments = normalizeAttachments(ctx);
7071
const results: AcpTurnAttachment[] = [];
7172
for (const attachment of mediaAttachments) {
73+
const mediaType = attachment.mime ?? "application/octet-stream";
74+
if (!mediaType.startsWith("image/")) {
75+
continue;
76+
}
7277
const filePath = normalizeAttachmentPath(attachment.path);
7378
if (!filePath) {
7479
continue;
@@ -83,7 +88,7 @@ async function resolveAcpAttachments(ctx: FinalizedMsgContext): Promise<AcpTurnA
8388
}
8489
const buf = await fs.readFile(filePath);
8590
results.push({
86-
mediaType: attachment.mime ?? "application/octet-stream",
91+
mediaType,
8792
data: buf.toString("base64"),
8893
});
8994
} catch {
@@ -224,16 +229,6 @@ export async function tryDispatchAcpReply(params: {
224229
onReplyStart: params.onReplyStart,
225230
});
226231

227-
const promptText = resolveAcpPromptText(params.ctx);
228-
const attachments = await resolveAcpAttachments(params.ctx);
229-
if (!promptText && attachments.length === 0) {
230-
const counts = params.dispatcher.getQueuedCounts();
231-
delivery.applyRoutedCounts(counts);
232-
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
233-
params.markIdle("message_completed");
234-
return { queuedFinal: false, counts };
235-
}
236-
237232
const identityPendingBeforeTurn = isSessionIdentityPending(
238233
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
239234
);
@@ -275,6 +270,28 @@ export async function tryDispatchAcpReply(params: {
275270
if (agentPolicyError) {
276271
throw agentPolicyError;
277272
}
273+
if (!params.ctx.MediaUnderstanding?.length) {
274+
try {
275+
await applyMediaUnderstanding({
276+
ctx: params.ctx,
277+
cfg: params.cfg,
278+
});
279+
} catch (err) {
280+
logVerbose(
281+
`dispatch-acp: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`,
282+
);
283+
}
284+
}
285+
286+
const promptText = resolveAcpPromptText(params.ctx);
287+
const attachments = await resolveAcpAttachments(params.ctx);
288+
if (!promptText && attachments.length === 0) {
289+
const counts = params.dispatcher.getQueuedCounts();
290+
delivery.applyRoutedCounts(counts);
291+
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
292+
params.markIdle("message_completed");
293+
return { queuedFinal: false, counts };
294+
}
278295

279296
try {
280297
await delivery.startReplyLifecycle();

0 commit comments

Comments
 (0)