Skip to content

Commit f4fcaa0

Browse files
authored
feat(gateway): add compaction checkpoints (#62146)
Merged via squash. Prepared head SHA: e375425 Co-authored-by: scoootscooob <[email protected]> Co-authored-by: scoootscooob <[email protected]> Reviewed-by: @scoootscooob
1 parent b44c10e commit f4fcaa0

34 files changed

Lines changed: 2172 additions & 52 deletions
85.9 KB
Loading
43.7 KB
Loading

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
1515
- ACP/ACPX plugin: bump the bundled `acpx` pin to `0.5.1` so plugin-local installs and strict version checks pick up the latest published runtime release. (#62148) Thanks @onutc.
1616
- Tools/media generation: auto-fallback across auth-backed image, music, and video providers by default, and remap fallback size, aspect ratio, resolution, and duration hints to the closest supported option instead of dropping intent on provider switches.
1717
- Tools/media generation: report applied fallback geometry and duration settings consistently in tool results, add a shared normalization contract for image/music/video runtimes, and simplify the bundled image-generation-core runtime test to only verify the plugin-sdk re-export seam.
18+
- Gateway/sessions: add persisted compaction checkpoints plus Sessions UI branch/restore actions so operators can inspect and recover pre-compaction session state. (#62146) Thanks @scoootscooob.
1819

1920
### Fixes
2021

src/agents/pi-embedded-runner/compact.ts

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ import {
1515
ensureContextEnginesInitialized,
1616
resolveContextEngine,
1717
} from "../../context-engine/index.js";
18+
import {
19+
captureCompactionCheckpointSnapshot,
20+
cleanupCompactionCheckpointSnapshot,
21+
persistSessionCompactionCheckpoint,
22+
resolveSessionCompactionCheckpointReason,
23+
type CapturedCompactionCheckpointSnapshot,
24+
} from "../../gateway/session-compaction-checkpoints.js";
1825
import { resolveHeartbeatSummaryForAgent } from "../../infra/heartbeat-summary.js";
1926
import { getMachineDisplayName } from "../../infra/machine-name.js";
2027
import { generateSecureToken } from "../../infra/secure-random.js";
@@ -108,6 +115,7 @@ import { applyExtraParamsToAgent } from "./extra-params.js";
108115
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js";
109116
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
110117
import { log } from "./logger.js";
118+
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
111119
import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-discovery-input.js";
112120
import { readPiModelContextTokens } from "./model-context-tokens.js";
113121
import { buildModelAliasLines, resolveModelAsync } from "./model.js";
@@ -415,6 +423,8 @@ export async function compactEmbeddedPiSessionDirect(
415423

416424
let restoreSkillEnv: (() => void) | undefined;
417425
let compactionSessionManager: unknown = null;
426+
let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null;
427+
let checkpointSnapshotRetained = false;
418428
try {
419429
const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({
420430
workspaceDir: effectiveWorkspace,
@@ -730,6 +740,10 @@ export async function compactEmbeddedPiSessionDirect(
730740
allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults,
731741
allowedToolNames,
732742
});
743+
checkpointSnapshot = captureCompactionCheckpointSnapshot({
744+
sessionManager,
745+
sessionFile: params.sessionFile,
746+
});
733747
compactionSessionManager = sessionManager;
734748
trackSessionManagerAccess(params.sessionFile);
735749
const settingsManager = createPreparedEmbeddedPiSettingsManager({
@@ -960,6 +974,28 @@ export async function compactEmbeddedPiSessionDirect(
960974
sessionKey: params.sessionKey,
961975
sessionFile: params.sessionFile,
962976
});
977+
let effectiveFirstKeptEntryId = result.firstKeptEntryId;
978+
let postCompactionLeafId =
979+
typeof sessionManager.getLeafId === "function"
980+
? (sessionManager.getLeafId() ?? undefined)
981+
: undefined;
982+
if (params.trigger === "manual") {
983+
try {
984+
const hardenedBoundary = await hardenManualCompactionBoundary({
985+
sessionFile: params.sessionFile,
986+
});
987+
if (hardenedBoundary.applied) {
988+
effectiveFirstKeptEntryId =
989+
hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId;
990+
postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId;
991+
session.agent.state.messages = hardenedBoundary.messages;
992+
}
993+
} catch (err) {
994+
log.warn("[compaction] failed to harden manual compaction boundary", {
995+
errorMessage: err instanceof Error ? err.message : String(err),
996+
});
997+
}
998+
}
963999
// Estimate tokens after compaction by summing token estimates for remaining messages
9641000
const tokensAfter = estimateTokensAfterCompaction({
9651001
messagesAfter: session.messages,
@@ -969,6 +1005,32 @@ export async function compactEmbeddedPiSessionDirect(
9691005
});
9701006
const messageCountAfter = session.messages.length;
9711007
const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter);
1008+
if (params.config && params.sessionKey && checkpointSnapshot) {
1009+
try {
1010+
const storedCheckpoint = await persistSessionCompactionCheckpoint({
1011+
cfg: params.config,
1012+
sessionKey: params.sessionKey,
1013+
sessionId: params.sessionId,
1014+
reason: resolveSessionCompactionCheckpointReason({
1015+
trigger: params.trigger,
1016+
}),
1017+
snapshot: checkpointSnapshot,
1018+
summary: result.summary,
1019+
firstKeptEntryId: effectiveFirstKeptEntryId,
1020+
tokensBefore: observedTokenCount ?? result.tokensBefore,
1021+
tokensAfter,
1022+
postSessionFile: params.sessionFile,
1023+
postLeafId: postCompactionLeafId,
1024+
postEntryId: postCompactionLeafId,
1025+
createdAt: compactStartedAt,
1026+
});
1027+
checkpointSnapshotRetained = storedCheckpoint !== null;
1028+
} catch (err) {
1029+
log.warn("failed to persist compaction checkpoint", {
1030+
errorMessage: err instanceof Error ? err.message : String(err),
1031+
});
1032+
}
1033+
}
9721034
const postMetrics = diagEnabled
9731035
? summarizeCompactionMessages(session.messages)
9741036
: undefined;
@@ -1000,7 +1062,7 @@ export async function compactEmbeddedPiSessionDirect(
10001062
sessionFile: params.sessionFile,
10011063
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
10021064
tokensBefore: result.tokensBefore,
1003-
firstKeptEntryId: result.firstKeptEntryId,
1065+
firstKeptEntryId: effectiveFirstKeptEntryId,
10041066
});
10051067
// Truncate session file to remove compacted entries (#39953)
10061068
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) {
@@ -1032,7 +1094,7 @@ export async function compactEmbeddedPiSessionDirect(
10321094
compacted: true,
10331095
result: {
10341096
summary: result.summary,
1035-
firstKeptEntryId: result.firstKeptEntryId,
1097+
firstKeptEntryId: effectiveFirstKeptEntryId,
10361098
tokensBefore: observedTokenCount ?? result.tokensBefore,
10371099
tokensAfter,
10381100
details: result.details,
@@ -1091,6 +1153,9 @@ export async function compactEmbeddedPiSessionDirect(
10911153
});
10921154
return fail(reason);
10931155
} finally {
1156+
if (!checkpointSnapshotRetained) {
1157+
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
1158+
}
10941159
restoreSkillEnv?.();
10951160
}
10961161
}
@@ -1116,6 +1181,8 @@ export async function compactEmbeddedPiSession(
11161181
});
11171182
ensureContextEnginesInitialized();
11181183
const contextEngine = await resolveContextEngine(params.config);
1184+
let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null;
1185+
let checkpointSnapshotRetained = false;
11191186
try {
11201187
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
11211188
const resolvedCompactionTarget = resolveEmbeddedCompactionTarget({
@@ -1150,6 +1217,12 @@ export async function compactEmbeddedPiSession(
11501217
// Fire before_compaction / after_compaction hooks here so plugin subscribers
11511218
// are notified regardless of which engine is active.
11521219
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
1220+
checkpointSnapshot = engineOwnsCompaction
1221+
? captureCompactionCheckpointSnapshot({
1222+
sessionManager: SessionManager.open(params.sessionFile),
1223+
sessionFile: params.sessionFile,
1224+
})
1225+
: null;
11531226
const hookRunner = engineOwnsCompaction
11541227
? asCompactionHookRunner(getGlobalHookRunner())
11551228
: null;
@@ -1222,6 +1295,33 @@ export async function compactEmbeddedPiSession(
12221295
runtimeContext,
12231296
});
12241297
if (result.ok && result.compacted) {
1298+
if (params.config && params.sessionKey && checkpointSnapshot) {
1299+
try {
1300+
const postCompactionSession = SessionManager.open(params.sessionFile);
1301+
const postLeafId = postCompactionSession.getLeafId() ?? undefined;
1302+
const storedCheckpoint = await persistSessionCompactionCheckpoint({
1303+
cfg: params.config,
1304+
sessionKey: params.sessionKey,
1305+
sessionId: params.sessionId,
1306+
reason: resolveSessionCompactionCheckpointReason({
1307+
trigger: params.trigger,
1308+
}),
1309+
snapshot: checkpointSnapshot,
1310+
summary: result.result?.summary,
1311+
firstKeptEntryId: result.result?.firstKeptEntryId,
1312+
tokensBefore: result.result?.tokensBefore,
1313+
tokensAfter: result.result?.tokensAfter,
1314+
postSessionFile: params.sessionFile,
1315+
postLeafId,
1316+
postEntryId: postLeafId,
1317+
});
1318+
checkpointSnapshotRetained = storedCheckpoint !== null;
1319+
} catch (err) {
1320+
log.warn("failed to persist compaction checkpoint", {
1321+
errorMessage: err instanceof Error ? err.message : String(err),
1322+
});
1323+
}
1324+
}
12251325
await runContextEngineMaintenance({
12261326
contextEngine,
12271327
sessionId: params.sessionId,
@@ -1275,6 +1375,9 @@ export async function compactEmbeddedPiSession(
12751375
: undefined,
12761376
};
12771377
} finally {
1378+
if (!checkpointSnapshotRetained) {
1379+
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
1380+
}
12781381
await contextEngine.dispose?.();
12791382
}
12801383
}),
@@ -1287,6 +1390,7 @@ export const __testing = {
12871390
containsRealConversationMessages,
12881391
estimateTokensAfterCompaction,
12891392
buildBeforeCompactionHookMetrics,
1393+
hardenManualCompactionBoundary,
12901394
runBeforeCompactionHooks,
12911395
runAfterCompactionHooks,
12921396
runPostCompactionSideEffects,
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import fs from "node:fs/promises";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import type { AgentMessage } from "@mariozechner/pi-agent-core";
5+
import type { AssistantMessage } from "@mariozechner/pi-ai";
6+
import { SessionManager } from "@mariozechner/pi-coding-agent";
7+
import { afterEach, describe, expect, it } from "vitest";
8+
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
9+
10+
let tmpDir = "";
11+
12+
async function makeTmpDir(): Promise<string> {
13+
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "manual-compaction-boundary-"));
14+
return tmpDir;
15+
}
16+
17+
afterEach(async () => {
18+
if (tmpDir) {
19+
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {});
20+
tmpDir = "";
21+
}
22+
});
23+
24+
function createAssistantTextMessage(text: string, timestamp: number): AssistantMessage {
25+
return {
26+
role: "assistant",
27+
content: [{ type: "text", text }],
28+
api: "responses",
29+
provider: "openai",
30+
model: "gpt-test",
31+
usage: {
32+
input: 1,
33+
output: 1,
34+
cacheRead: 0,
35+
cacheWrite: 0,
36+
totalTokens: 2,
37+
cost: {
38+
input: 0,
39+
output: 0,
40+
cacheRead: 0,
41+
cacheWrite: 0,
42+
total: 0,
43+
},
44+
},
45+
stopReason: "stop",
46+
timestamp,
47+
};
48+
}
49+
50+
function messageText(message: AgentMessage): string {
51+
if (!("content" in message)) {
52+
return "";
53+
}
54+
const content = message.content;
55+
if (typeof content === "string") {
56+
return content;
57+
}
58+
if (!Array.isArray(content)) {
59+
return "";
60+
}
61+
return content
62+
.map((block) =>
63+
block && typeof block === "object" && "text" in block && typeof block.text === "string"
64+
? block.text
65+
: "",
66+
)
67+
.filter(Boolean)
68+
.join(" ");
69+
}
70+
71+
describe("hardenManualCompactionBoundary", () => {
72+
it("turns manual compaction into a true checkpoint for rebuilt context", async () => {
73+
const dir = await makeTmpDir();
74+
const session = SessionManager.create(dir, dir);
75+
76+
session.appendMessage({ role: "user", content: "old question", timestamp: 1 });
77+
session.appendMessage(createAssistantTextMessage("very long old answer", 2));
78+
const firstKeepId = session.getBranch().at(-1)?.id;
79+
expect(firstKeepId).toBeTruthy();
80+
session.appendCompaction("old summary", firstKeepId!, 100);
81+
82+
session.appendMessage({ role: "user", content: "new question", timestamp: 3 });
83+
session.appendMessage(
84+
createAssistantTextMessage("detailed new answer that should be summarized away", 4),
85+
);
86+
const secondKeepId = session.getBranch().at(-1)?.id;
87+
expect(secondKeepId).toBeTruthy();
88+
const latestCompactionId = session.appendCompaction("fresh summary", secondKeepId!, 200);
89+
const sessionFile = session.getSessionFile();
90+
expect(sessionFile).toBeTruthy();
91+
92+
const before = SessionManager.open(sessionFile!);
93+
const beforeTexts = before
94+
.buildSessionContext()
95+
.messages.map((message) => messageText(message));
96+
expect(beforeTexts.join("\n")).toContain("detailed new answer");
97+
98+
const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! });
99+
expect(hardened.applied).toBe(true);
100+
expect(hardened.firstKeptEntryId).toBe(latestCompactionId);
101+
expect(hardened.messages.map((message) => message.role)).toEqual(["compactionSummary"]);
102+
103+
const reopened = SessionManager.open(sessionFile!);
104+
const latest = reopened.getLeafEntry();
105+
expect(latest?.type).toBe("compaction");
106+
if (!latest || latest.type !== "compaction") {
107+
throw new Error("expected latest leaf to be a compaction entry");
108+
}
109+
expect(latest.firstKeptEntryId).toBe(latestCompactionId);
110+
111+
reopened.appendMessage({ role: "user", content: "what was happening?", timestamp: 5 });
112+
const after = SessionManager.open(sessionFile!);
113+
const afterTexts = after.buildSessionContext().messages.map((message) => messageText(message));
114+
expect(after.buildSessionContext().messages.map((message) => message.role)).toEqual([
115+
"compactionSummary",
116+
"user",
117+
]);
118+
expect(afterTexts.join("\n")).not.toContain("detailed new answer");
119+
});
120+
121+
it("is a no-op when the latest leaf is not a compaction entry", async () => {
122+
const dir = await makeTmpDir();
123+
const session = SessionManager.create(dir, dir);
124+
session.appendMessage({ role: "user", content: "hello", timestamp: 1 });
125+
session.appendMessage(createAssistantTextMessage("hi", 2));
126+
const sessionFile = session.getSessionFile();
127+
expect(sessionFile).toBeTruthy();
128+
129+
const result = await hardenManualCompactionBoundary({ sessionFile: sessionFile! });
130+
expect(result.applied).toBe(false);
131+
expect(result.messages.map((message) => message.role)).toEqual(["user", "assistant"]);
132+
});
133+
});

0 commit comments

Comments
 (0)