Skip to content

Commit 3533521

Browse files
committed
fix(compaction): avoid preserving duplicate user turns
1 parent dae09d2 commit 3533521

10 files changed

Lines changed: 273 additions & 2 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
2727
- MCP/bundle-mcp: normalize CLI-native `type: "http"` MCP server entries to OpenClaw `transport: "streamable-http"` on save, repair existing configs with doctor, and keep embedded Pi from falling back to legacy SSE GET-first startup for those servers. Fixes #72757. Thanks @Studioscale.
2828
- Media-understanding/audio: migrate deprecated `{input}` placeholders in legacy `audio.transcription.command` configs to `{{MediaPath}}`, so custom audio transcribers no longer receive the literal placeholder after doctor repair. Fixes #72760. Thanks @krisfanue3-hash.
2929
- Ollama/onboarding: de-dupe suggested bare local models against installed `:latest` tags and skip redundant pulls, so setup shows the installed model once and no longer says it is downloading an already available model. Fixes #68952. Thanks @tleyden.
30+
- Compaction: skip oversized pre-compaction checkpoint snapshots and prune duplicate long user turns from compaction input and rotated successor transcripts, preventing retry storms from being preserved across checkpoint cycles. Fixes #72780. Thanks @SweetSophia.
3031
- Control UI/Gateway: preserve WebChat client version labels across localhost, 127.0.0.1, and IPv6 loopback aliases on the same port, avoiding misleading `vcontrol-ui` connection logs while investigating duplicate-message reports. Refs #72753 and #72742. Thanks @LumenFromTheFuture and @allesgutefy.
3132
- Agents/reasoning: treat orphan closing reasoning tags with following answer text as a privacy boundary across delivery, history, streaming, and Control UI sanitizers so malformed local-model output cannot leak chain-of-thought text. Fixes #67092. Thanks @AnildoSilva.
3233
- Memory-core: run one-shot memory CLI commands through transient builtin and QMD managers so `memory index`, `memory status --index`, and `memory search` no longer start long-lived file watchers that can hit macOS `EMFILE` limits. Fixes #59101; carries forward #49851. Thanks @mbear469210-coder and @maoyuanxue.

docs/concepts/compaction.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ The byte guard requires `truncateAfterCompaction: true`. Without transcript rota
106106
### Successor transcripts
107107

108108
When `agents.defaults.compaction.truncateAfterCompaction` is enabled, OpenClaw does not rewrite the existing transcript in place. It creates a new active successor transcript from the compaction summary, preserved state, and unsummarized tail, then keeps the previous JSONL as the archived checkpoint source.
109+
Successor transcripts also drop exact duplicate long user turns that arrive
110+
inside a short retry window, so channel retry storms are not carried into the
111+
next active transcript after compaction.
112+
113+
Pre-compaction checkpoints are retained only while they stay below OpenClaw's
114+
checkpoint size cap; oversized active transcripts still compact, but OpenClaw
115+
skips the large debug snapshot instead of doubling disk usage.
109116

110117
### Compaction notices
111118

docs/reference/session-management-compaction.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ OpenClaw persists sessions in two layers:
5050
- Append-only transcript with tree structure (entries have `id` + `parentId`)
5151
- Stores the actual conversation + tool calls + compaction summaries
5252
- Used to rebuild the model context for future turns
53+
- Large pre-compaction debug checkpoints are skipped once the active
54+
transcript exceeds the checkpoint size cap, avoiding a second giant
55+
`.checkpoint.*.jsonl` copy.
5356

5457
---
5558

src/agents/pi-embedded-runner/compact.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ import {
9999
import { resolveSystemPromptOverride } from "../system-prompt-override.js";
100100
import { classifyCompactionReason, resolveCompactionFailureReason } from "./compact-reasons.js";
101101
import type { CompactEmbeddedPiSessionParams, CompactionMessageMetrics } from "./compact.types.js";
102+
import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js";
102103
import {
103104
asCompactionHookRunner,
104105
buildBeforeCompactionHookMetrics,
@@ -972,9 +973,10 @@ export async function compactEmbeddedPiSessionDirect(
972973
sessionId: params.sessionId,
973974
policy: transcriptPolicy,
974975
});
976+
const dedupedValidated = dedupeDuplicateUserMessagesForCompaction(validated);
975977
// Apply validated transcript to the live session even when no history limit is configured,
976978
// so compaction and hook metrics are based on the same message set.
977-
session.agent.state.messages = validated;
979+
session.agent.state.messages = dedupedValidated;
978980
// "Original" compaction metrics should describe the validated transcript that enters
979981
// limiting/compaction, not the raw on-disk session snapshot.
980982
const originalMessages = session.messages.slice();
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { describe, expect, it } from "vitest";
2+
import {
3+
collectDuplicateUserMessageEntryIdsForCompaction,
4+
dedupeDuplicateUserMessagesForCompaction,
5+
} from "./compaction-duplicate-user-messages.js";
6+
7+
describe("compaction duplicate user message pruning", () => {
8+
it("drops identical long user messages inside the duplicate window", () => {
9+
const first = {
10+
role: "user",
11+
content: "please run the deployment status check for production",
12+
timestamp: 1_000,
13+
} as const;
14+
const second = {
15+
role: "user",
16+
content: " please run the deployment status check for production ",
17+
timestamp: 2_000,
18+
} as const;
19+
const third = {
20+
role: "assistant",
21+
content: [{ type: "text", text: "checking" }],
22+
timestamp: 3_000,
23+
} as const;
24+
25+
expect(dedupeDuplicateUserMessagesForCompaction([first, second, third])).toEqual([
26+
first,
27+
third,
28+
]);
29+
});
30+
31+
it("keeps short repeated acknowledgements and distant repeats", () => {
32+
const short = { role: "user", content: "next", timestamp: 1_000 } as const;
33+
const shortAgain = { role: "user", content: "next", timestamp: 2_000 } as const;
34+
const long = {
35+
role: "user",
36+
content: "please run the deployment status check for production",
37+
timestamp: 1_000,
38+
} as const;
39+
const longLater = {
40+
role: "user",
41+
content: "please run the deployment status check for production",
42+
timestamp: 70_000,
43+
} as const;
44+
45+
expect(dedupeDuplicateUserMessagesForCompaction([short, shortAgain])).toEqual([
46+
short,
47+
shortAgain,
48+
]);
49+
expect(dedupeDuplicateUserMessagesForCompaction([long, longLater])).toEqual([long, longLater]);
50+
});
51+
52+
it("collects duplicate transcript entry ids from active branch entries", () => {
53+
const duplicateIds = collectDuplicateUserMessageEntryIdsForCompaction([
54+
{
55+
id: "entry-1",
56+
type: "message",
57+
message: {
58+
role: "user",
59+
content: "please run the deployment status check for production",
60+
timestamp: 1_000,
61+
},
62+
},
63+
{
64+
id: "entry-2",
65+
type: "message",
66+
message: {
67+
role: "user",
68+
content: "please run the deployment status check for production",
69+
timestamp: 2_000,
70+
},
71+
},
72+
]);
73+
74+
expect(duplicateIds).toEqual(new Set(["entry-2"]));
75+
});
76+
});
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
const DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS = 60_000;
2+
const MIN_DUPLICATE_USER_MESSAGE_CHARS = 24;
3+
4+
type MessageLike = {
5+
role?: unknown;
6+
content?: unknown;
7+
timestamp?: unknown;
8+
};
9+
10+
type EntryLike = {
11+
id?: unknown;
12+
type?: unknown;
13+
message?: unknown;
14+
};
15+
16+
type DuplicateUserMessageOptions = {
17+
windowMs?: number;
18+
};
19+
20+
function isRecord(value: unknown): value is Record<string, unknown> {
21+
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
22+
}
23+
24+
function normalizeUserMessageContent(content: unknown): string | undefined {
25+
if (typeof content === "string") {
26+
return content.replace(/\s+/g, " ").trim();
27+
}
28+
if (!Array.isArray(content)) {
29+
return undefined;
30+
}
31+
const textParts: string[] = [];
32+
for (const block of content) {
33+
if (!isRecord(block)) {
34+
return undefined;
35+
}
36+
if (block.type === "image") {
37+
return undefined;
38+
}
39+
if (block.type === "text" && typeof block.text === "string") {
40+
textParts.push(block.text);
41+
}
42+
}
43+
return textParts.join("\n").replace(/\s+/g, " ").trim();
44+
}
45+
46+
function duplicateSignature(message: unknown): { key: string; timestamp: number } | undefined {
47+
if (!isRecord(message) || message.role !== "user" || typeof message.timestamp !== "number") {
48+
return undefined;
49+
}
50+
const text = normalizeUserMessageContent(message.content);
51+
if (!text || text.length < MIN_DUPLICATE_USER_MESSAGE_CHARS) {
52+
return undefined;
53+
}
54+
return {
55+
key: text.normalize("NFC").toLowerCase(),
56+
timestamp: message.timestamp,
57+
};
58+
}
59+
60+
export function dedupeDuplicateUserMessagesForCompaction<T extends MessageLike>(
61+
messages: readonly T[],
62+
options: DuplicateUserMessageOptions = {},
63+
): T[] {
64+
const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS;
65+
const lastSeenAtByKey = new Map<string, number>();
66+
let removed = 0;
67+
const result: T[] = [];
68+
for (const message of messages) {
69+
const signature = duplicateSignature(message);
70+
if (!signature) {
71+
result.push(message);
72+
continue;
73+
}
74+
const lastSeenAt = lastSeenAtByKey.get(signature.key);
75+
lastSeenAtByKey.set(signature.key, signature.timestamp);
76+
if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) {
77+
removed += 1;
78+
continue;
79+
}
80+
result.push(message);
81+
}
82+
return removed > 0 ? result : [...messages];
83+
}
84+
85+
export function collectDuplicateUserMessageEntryIdsForCompaction(
86+
entries: readonly EntryLike[],
87+
options: DuplicateUserMessageOptions = {},
88+
): Set<string> {
89+
const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS;
90+
const lastSeenAtByKey = new Map<string, number>();
91+
const duplicateIds = new Set<string>();
92+
for (const entry of entries) {
93+
if (entry.type !== "message" || typeof entry.id !== "string") {
94+
continue;
95+
}
96+
const signature = duplicateSignature(
97+
isRecord(entry.message) ? (entry.message as MessageLike) : undefined,
98+
);
99+
if (!signature) {
100+
continue;
101+
}
102+
const lastSeenAt = lastSeenAtByKey.get(signature.key);
103+
lastSeenAtByKey.set(signature.key, signature.timestamp);
104+
if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) {
105+
duplicateIds.add(entry.id);
106+
}
107+
}
108+
return duplicateIds;
109+
}

src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,39 @@ describe("rotateTranscriptAfterCompaction", () => {
153153
expect(successor.getSessionName()).toBe("current title");
154154
});
155155

156+
it("drops duplicate user messages from the rotated active branch tail", async () => {
157+
const dir = await createTmpDir();
158+
const manager = SessionManager.create(dir, dir);
159+
manager.appendMessage({ role: "user", content: "old user", timestamp: 1 });
160+
const firstKeptId = manager.appendMessage(makeAssistant("old assistant", 2));
161+
manager.appendCompaction("Summary of old work.", firstKeptId, 5000);
162+
const firstDuplicateId = manager.appendMessage({
163+
role: "user",
164+
content: "please run the deployment status check for production",
165+
timestamp: 3_000,
166+
});
167+
const secondDuplicateId = manager.appendMessage({
168+
role: "user",
169+
content: " please run the deployment status check for production ",
170+
timestamp: 4_000,
171+
});
172+
manager.appendMessage(makeAssistant("status checked", 5_000));
173+
174+
const result = await rotateTranscriptAfterCompaction({
175+
sessionManager: manager,
176+
sessionFile: manager.getSessionFile()!,
177+
now: () => new Date("2026-04-27T12:10:00.000Z"),
178+
});
179+
180+
expect(result.rotated).toBe(true);
181+
const successor = SessionManager.open(result.sessionFile!);
182+
const entries = successor.getEntries();
183+
expect(entries.find((entry) => entry.id === firstDuplicateId)).toBeDefined();
184+
expect(entries.find((entry) => entry.id === secondDuplicateId)).toBeUndefined();
185+
const contextText = JSON.stringify(successor.buildSessionContext().messages);
186+
expect(contextText.match(/deployment status check/g)).toHaveLength(1);
187+
});
188+
156189
it("skips sessions with no compaction entry", async () => {
157190
const dir = await createTmpDir();
158191
const manager = SessionManager.create(dir, dir);

src/agents/pi-embedded-runner/compaction-successor-transcript.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type SessionHeader,
1010
} from "@mariozechner/pi-coding-agent";
1111
import type { OpenClawConfig } from "../../config/types.openclaw.js";
12+
import { collectDuplicateUserMessageEntryIdsForCompaction } from "./compaction-duplicate-user-messages.js";
1213

1314
type ReadonlySessionManagerForRotation = Pick<
1415
SessionManager,
@@ -126,10 +127,12 @@ function buildSuccessorEntries(params: {
126127
}
127128

128129
const removedIds = new Set<string>();
130+
const duplicateUserMessageIds = collectDuplicateUserMessageEntryIdsForCompaction(branch);
129131
for (const entry of allEntries) {
130132
if (
131133
(summarizedBranchIds.has(entry.id) && entry.type === "message") ||
132-
staleStateEntryIds.has(entry.id)
134+
staleStateEntryIds.has(entry.id) ||
135+
duplicateUserMessageIds.has(entry.id)
133136
) {
134137
removedIds.add(entry.id);
135138
}

src/gateway/session-compaction-checkpoints.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
99
import {
1010
captureCompactionCheckpointSnapshot,
1111
cleanupCompactionCheckpointSnapshot,
12+
MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
1213
persistSessionCompactionCheckpoint,
1314
} from "./session-compaction-checkpoints.js";
1415

@@ -84,6 +85,31 @@ describe("session-compaction-checkpoints", () => {
8485
expect(fsSync.existsSync(sessionFile!)).toBe(true);
8586
});
8687

88+
test("capture skips oversized pre-compaction transcripts", async () => {
89+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-"));
90+
tempDirs.push(dir);
91+
92+
const session = SessionManager.create(dir, dir);
93+
session.appendMessage({
94+
role: "user",
95+
content: "before compaction",
96+
timestamp: Date.now(),
97+
});
98+
const sessionFile = session.getSessionFile();
99+
expect(sessionFile).toBeTruthy();
100+
await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8");
101+
102+
const snapshot = captureCompactionCheckpointSnapshot({
103+
sessionManager: session,
104+
sessionFile: sessionFile!,
105+
maxBytes: 64,
106+
});
107+
108+
expect(snapshot).toBeNull();
109+
expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64);
110+
expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]);
111+
});
112+
87113
test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => {
88114
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-trim-"));
89115
tempDirs.push(dir);

src/gateway/session-compaction-checkpoints.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js";
1616

1717
const log = createSubsystemLogger("gateway/session-compaction-checkpoints");
1818
const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25;
19+
export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024;
1920

2021
export type CapturedCompactionCheckpointSnapshot = {
2122
sessionId: string;
@@ -62,6 +63,7 @@ export function resolveSessionCompactionCheckpointReason(params: {
6263
export function captureCompactionCheckpointSnapshot(params: {
6364
sessionManager: Pick<SessionManager, "getLeafId">;
6465
sessionFile: string;
66+
maxBytes?: number;
6567
}): CapturedCompactionCheckpointSnapshot | null {
6668
const getLeafId =
6769
params.sessionManager && typeof params.sessionManager.getLeafId === "function"
@@ -71,6 +73,15 @@ export function captureCompactionCheckpointSnapshot(params: {
7173
if (!getLeafId || !sessionFile) {
7274
return null;
7375
}
76+
const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
77+
try {
78+
const stat = fsSync.statSync(sessionFile);
79+
if (!stat.isFile() || stat.size > maxBytes) {
80+
return null;
81+
}
82+
} catch {
83+
return null;
84+
}
7485
const leafId = getLeafId();
7586
if (!leafId) {
7687
return null;

0 commit comments

Comments
 (0)