Skip to content

Commit d2e8ed3

Browse files
fix: keep session transcript pointers fresh after compaction (openclaw#50688)
Co-authored-by: Frank Yang <[email protected]>
1 parent dd132ea commit d2e8ed3

13 files changed

+977
-775
lines changed

src/auto-reply/reply/agent-runner-memory.ts

Lines changed: 29 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ import {
3737
resolveMemoryFlushPromptForRun,
3838
resolveMemoryFlushSettings,
3939
shouldRunMemoryFlush,
40-
computeContextHash,
4140
} from "./memory-flush.js";
42-
import type { FollowupRun } from "./queue.js";
41+
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
4342
import { incrementCompactionCount } from "./session-updates.js";
4443

4544
export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined {
@@ -448,47 +447,6 @@ export async function runMemoryFlushIfNeeded(params: {
448447
return entry ?? params.sessionEntry;
449448
}
450449

451-
// --- Content hash dedup (state-based) ---
452-
// Read the tail of the session transcript and compute a lightweight hash.
453-
// If the hash matches the last flush, the context hasn't materially changed
454-
// and flushing again would produce duplicate memory entries (#30115).
455-
const sessionFilePath = await resolveSessionFilePathForFlush(
456-
params.followupRun.run.sessionId,
457-
entry ?? params.sessionEntry,
458-
params.storePath,
459-
params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined,
460-
);
461-
let contextHashBeforeFlush: string | undefined;
462-
if (sessionFilePath) {
463-
try {
464-
const tailMessages = await readTranscriptTailMessages(sessionFilePath, 10);
465-
// Include the pending prompt in the hash — runMemoryFlushIfNeeded runs
466-
// before the current prompt is appended to the transcript, so the
467-
// persisted tail alone would match the post-flush hash and incorrectly
468-
// skip the next flush even when a new user message arrived.
469-
const currentPrompt = params.followupRun.prompt;
470-
if (currentPrompt) {
471-
tailMessages.push({ role: "user", content: currentPrompt });
472-
}
473-
if (tailMessages.length === 0) {
474-
logVerbose(
475-
`memoryFlush dedup skipped (no tail messages extracted): sessionKey=${params.sessionKey}`,
476-
);
477-
}
478-
contextHashBeforeFlush =
479-
tailMessages.length > 0 ? computeContextHash(tailMessages) : undefined;
480-
const previousHash = entry?.memoryFlushContextHash;
481-
if (previousHash && contextHashBeforeFlush === previousHash) {
482-
logVerbose(
483-
`memoryFlush skipped (context hash unchanged): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush}`,
484-
);
485-
return entry ?? params.sessionEntry;
486-
}
487-
} catch (err) {
488-
logVerbose(`memoryFlush hash check failed, proceeding with flush: ${String(err)}`);
489-
}
490-
}
491-
492450
logVerbose(
493451
`memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`,
494452
);
@@ -507,7 +465,6 @@ export async function runMemoryFlushIfNeeded(params: {
507465
});
508466
}
509467
let memoryCompactionCompleted = false;
510-
let fallbackFlushAttemptedForCurrentHash = false;
511468
const memoryFlushNowMs = Date.now();
512469
const memoryFlushWritePath = resolveMemoryFlushRelativePathForRun({
513470
cfg: params.cfg,
@@ -519,21 +476,12 @@ export async function runMemoryFlushIfNeeded(params: {
519476
]
520477
.filter(Boolean)
521478
.join("\n\n");
479+
let postCompactionSessionId: string | undefined;
522480
try {
523481
await runWithModelFallback({
524482
...resolveModelFallbackOptions(params.followupRun.run),
525483
runId: flushRunId,
526484
run: async (provider, model, runOptions) => {
527-
if (contextHashBeforeFlush && fallbackFlushAttemptedForCurrentHash) {
528-
logVerbose(
529-
`memoryFlush fallback candidate skipped (context hash already attempted): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush} provider=${provider} model=${model}`,
530-
);
531-
// A prior candidate already attempted this exact flush context. Be
532-
// conservative and skip later candidates so a write-then-throw failure
533-
// cannot append the same memory twice during a single fallback cycle.
534-
return { payloads: [], meta: {} };
535-
}
536-
fallbackFlushAttemptedForCurrentHash = Boolean(contextHashBeforeFlush);
537485
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams({
538486
run: params.followupRun.run,
539487
sessionCtx: params.sessionCtx,
@@ -562,12 +510,15 @@ export async function runMemoryFlushIfNeeded(params: {
562510
onAgentEvent: (evt) => {
563511
if (evt.stream === "compaction") {
564512
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
565-
if (phase === "end" && evt.data.completed === true) {
513+
if (phase === "end") {
566514
memoryCompactionCompleted = true;
567515
}
568516
}
569517
},
570518
});
519+
if (result.meta?.agentMeta?.sessionId) {
520+
postCompactionSessionId = result.meta.agentMeta.sessionId;
521+
}
571522
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
572523
result.meta?.systemPromptReport,
573524
);
@@ -579,45 +530,51 @@ export async function runMemoryFlushIfNeeded(params: {
579530
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??
580531
0;
581532
if (memoryCompactionCompleted) {
533+
const previousSessionId = activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId;
582534
const nextCount = await incrementCompactionCount({
583535
sessionEntry: activeSessionEntry,
584536
sessionStore: activeSessionStore,
585537
sessionKey: params.sessionKey,
586538
storePath: params.storePath,
539+
newSessionId: postCompactionSessionId,
587540
});
541+
const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined;
542+
if (updatedEntry) {
543+
activeSessionEntry = updatedEntry;
544+
params.followupRun.run.sessionId = updatedEntry.sessionId;
545+
if (updatedEntry.sessionFile) {
546+
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
547+
}
548+
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
549+
if (queueKey) {
550+
refreshQueuedFollowupSession({
551+
key: queueKey,
552+
previousSessionId,
553+
nextSessionId: updatedEntry.sessionId,
554+
nextSessionFile: updatedEntry.sessionFile,
555+
});
556+
}
557+
}
588558
if (typeof nextCount === "number") {
589559
memoryFlushCompactionCount = nextCount;
590560
}
591561
}
592562
if (params.storePath && params.sessionKey) {
593563
try {
594-
// Re-hash the transcript AFTER the flush so the stored hash matches
595-
// what the next pre-flush check will compute (the transcript now
596-
// includes the flush turn's messages). (#34222)
597-
let contextHashAfterFlush = contextHashBeforeFlush;
598-
if (sessionFilePath) {
599-
try {
600-
const postFlushMessages = await readTranscriptTailMessages(sessionFilePath, 10);
601-
if (postFlushMessages.length > 0) {
602-
contextHashAfterFlush = computeContextHash(postFlushMessages);
603-
}
604-
} catch {
605-
// Best-effort: fall back to pre-flush hash if re-read fails.
606-
}
607-
}
608564
const updatedEntry = await updateSessionStoreEntry({
609565
storePath: params.storePath,
610566
sessionKey: params.sessionKey,
611567
update: async () => ({
612568
memoryFlushAt: Date.now(),
613569
memoryFlushCompactionCount,
614-
// Always write the hash field — when rehashing fails, clearing
615-
// the stale value prevents incorrect dedup on subsequent flushes.
616-
memoryFlushContextHash: contextHashAfterFlush ?? undefined,
617570
}),
618571
});
619572
if (updatedEntry) {
620573
activeSessionEntry = updatedEntry;
574+
params.followupRun.run.sessionId = updatedEntry.sessionId;
575+
if (updatedEntry.sessionFile) {
576+
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
577+
}
621578
}
622579
} catch (err) {
623580
logVerbose(`failed to persist memory flush metadata: ${String(err)}`);
@@ -629,64 +586,3 @@ export async function runMemoryFlushIfNeeded(params: {
629586

630587
return activeSessionEntry;
631588
}
632-
633-
/**
634-
* Resolve the session transcript file path for flush hash computation.
635-
*/
636-
async function resolveSessionFilePathForFlush(
637-
sessionId: string | undefined,
638-
entry: SessionEntry | undefined,
639-
storePath: string | undefined,
640-
agentId: string | undefined,
641-
): Promise<string | undefined> {
642-
if (!sessionId) {
643-
return undefined;
644-
}
645-
const resolved = resolveSessionFilePath(
646-
sessionId,
647-
entry,
648-
resolveSessionFilePathOptions({ agentId, storePath }),
649-
);
650-
return resolved ?? undefined;
651-
}
652-
653-
/**
654-
* Read the last N messages from a session transcript file.
655-
* Only reads the tail of the file to avoid loading multi-MB transcripts.
656-
*/
657-
async function readTranscriptTailMessages(
658-
filePath: string,
659-
maxMessages: number,
660-
): Promise<Array<{ role?: string; content?: unknown }>> {
661-
const TAIL_BYTES = 64 * 1024;
662-
const handle = await fs.promises.open(filePath, "r");
663-
try {
664-
const stat = await handle.stat();
665-
const start = Math.max(0, stat.size - TAIL_BYTES);
666-
const readLen = Math.min(stat.size, TAIL_BYTES);
667-
const buf = Buffer.alloc(readLen);
668-
await handle.read(buf, 0, readLen, start);
669-
const tail = buf.toString("utf-8");
670-
const nlIdx = tail.indexOf("\n");
671-
const trimmed = start > 0 ? (nlIdx >= 0 ? tail.slice(nlIdx + 1) : "") : tail;
672-
const lines = trimmed.split(/\r?\n/);
673-
const messages: Array<{ role?: string; content?: unknown }> = [];
674-
for (let i = lines.length - 1; i >= 0 && messages.length < maxMessages; i--) {
675-
const line = lines[i].trim();
676-
if (!line) {
677-
continue;
678-
}
679-
try {
680-
const parsed = JSON.parse(line);
681-
if (parsed?.message?.role) {
682-
messages.unshift({ role: parsed.message.role, content: parsed.message.content });
683-
}
684-
} catch {
685-
// Skip malformed lines
686-
}
687-
}
688-
return messages;
689-
} finally {
690-
await handle.close();
691-
}
692-
}

src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,6 @@ const runEmbeddedPiAgentMock = vi.fn();
1515
const runCliAgentMock = vi.fn();
1616
const runWithModelFallbackMock = vi.fn();
1717
const runtimeErrorMock = vi.fn();
18-
const runMemoryFlushIfNeededMock = vi.hoisted(() =>
19-
vi.fn(async ({ sessionEntry }) => sessionEntry),
20-
);
21-
const createReplyMediaPathNormalizerMock = vi.hoisted(() =>
22-
vi.fn(
23-
(_params?: unknown) =>
24-
async <T>(payload: T) =>
25-
payload,
26-
),
27-
);
2818

2919
vi.mock("../../agents/model-fallback.js", () => ({
3020
runWithModelFallback: (params: {
@@ -68,14 +58,6 @@ vi.mock("../../runtime.js", async () => {
6858
};
6959
});
7060

71-
vi.mock("./agent-runner-memory.runtime.js", () => ({
72-
runMemoryFlushIfNeeded: (params: unknown) => runMemoryFlushIfNeededMock(params),
73-
}));
74-
75-
vi.mock("./reply-media-paths.runtime.js", () => ({
76-
createReplyMediaPathNormalizer: (params: unknown) => createReplyMediaPathNormalizerMock(params),
77-
}));
78-
7961
vi.mock("./queue.js", async () => {
8062
const actual = await vi.importActual<typeof import("./queue.js")>("./queue.js");
8163
return {
@@ -103,40 +85,10 @@ type RunWithModelFallbackParams = {
10385
};
10486

10587
beforeEach(() => {
106-
vi.useRealTimers();
107-
vi.clearAllTimers();
10888
runEmbeddedPiAgentMock.mockClear();
10989
runCliAgentMock.mockClear();
11090
runWithModelFallbackMock.mockClear();
11191
runtimeErrorMock.mockClear();
112-
runMemoryFlushIfNeededMock.mockClear();
113-
runMemoryFlushIfNeededMock.mockImplementation(
114-
async ({
115-
sessionEntry,
116-
followupRun,
117-
}: {
118-
sessionEntry?: SessionEntry;
119-
followupRun: FollowupRun;
120-
}) => {
121-
if (!sessionEntry || (sessionEntry.totalTokens ?? 0) < 1_000_000) {
122-
return sessionEntry;
123-
}
124-
await runWithModelFallbackMock({
125-
provider: followupRun.run.provider,
126-
model: followupRun.run.model,
127-
run: async (provider: string, model: string) =>
128-
await runEmbeddedPiAgentMock({
129-
provider,
130-
model,
131-
prompt: "Pre-compaction memory flush.",
132-
enforceFinalTag: provider.includes("gemini") ? true : undefined,
133-
}),
134-
});
135-
return sessionEntry;
136-
},
137-
);
138-
createReplyMediaPathNormalizerMock.mockClear();
139-
createReplyMediaPathNormalizerMock.mockImplementation(() => async (payload) => payload);
14092
loadCronStoreMock.mockClear();
14193
// Default: no cron jobs in store.
14294
loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] });
@@ -153,7 +105,6 @@ beforeEach(() => {
153105
});
154106

155107
afterEach(() => {
156-
vi.clearAllTimers();
157108
vi.useRealTimers();
158109
resetSystemEventsForTest();
159110
});
@@ -388,6 +339,11 @@ describe("runReplyAgent auto-compaction token update", () => {
388339
);
389340
}
390341

342+
async function normalizeComparablePath(filePath: string): Promise<string> {
343+
const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath));
344+
return path.join(parent, path.basename(filePath));
345+
}
346+
391347
function createBaseRun(params: {
392348
storePath: string;
393349
sessionEntry: Record<string, unknown>;
@@ -436,6 +392,7 @@ describe("runReplyAgent auto-compaction token update", () => {
436392
const sessionKey = "main";
437393
const sessionEntry = {
438394
sessionId: "session",
395+
sessionFile: path.join(tmp, "session.jsonl"),
439396
updatedAt: Date.now(),
440397
totalTokens: 181_000,
441398
compactionCount: 0,
@@ -524,6 +481,7 @@ describe("runReplyAgent auto-compaction token update", () => {
524481
payloads: [{ text: "done" }],
525482
meta: {
526483
agentMeta: {
484+
sessionId: "session-rotated",
527485
usage: { input: 190_000, output: 8_000, total: 198_000 },
528486
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
529487
compactionCount: 2,
@@ -568,6 +526,10 @@ describe("runReplyAgent auto-compaction token update", () => {
568526
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
569527
expect(stored[sessionKey].totalTokens).toBe(10_000);
570528
expect(stored[sessionKey].compactionCount).toBe(2);
529+
expect(stored[sessionKey].sessionId).toBe("session-rotated");
530+
expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe(
531+
await normalizeComparablePath(path.join(tmp, "session-rotated.jsonl")),
532+
);
571533
});
572534

573535
it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => {

0 commit comments

Comments
 (0)