Skip to content

Commit 5e32491

Browse files
committed
fix: avoid sync checkpoint metadata reads
1 parent 94f9be7 commit 5e32491

4 files changed

Lines changed: 162 additions & 19 deletions

File tree

src/agents/pi-embedded-runner/compact.queued.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
captureCompactionCheckpointSnapshotAsync,
77
cleanupCompactionCheckpointSnapshot,
88
persistSessionCompactionCheckpoint,
9+
readSessionLeafIdFromTranscriptAsync,
910
resolveSessionCompactionCheckpointReason,
1011
type CapturedCompactionCheckpointSnapshot,
1112
} from "../../gateway/session-compaction-checkpoints.js";
@@ -116,7 +117,6 @@ export async function compactEmbeddedPiSession(
116117
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
117118
checkpointSnapshot = engineOwnsCompaction
118119
? await captureCompactionCheckpointSnapshotAsync({
119-
sessionManager: SessionManager.open(params.sessionFile),
120120
sessionFile: params.sessionFile,
121121
})
122122
: null;
@@ -200,7 +200,7 @@ export async function compactEmbeddedPiSession(
200200
try {
201201
const postLeafId =
202202
postCompactionLeafId ??
203-
SessionManager.open(postCompactionSessionFile).getLeafId() ??
203+
(await readSessionLeafIdFromTranscriptAsync(postCompactionSessionFile)) ??
204204
undefined;
205205
const storedCheckpoint = await persistSessionCompactionCheckpoint({
206206
cfg: params.config,

src/gateway/session-compaction-checkpoints.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
cleanupCompactionCheckpointSnapshot,
1313
MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
1414
persistSessionCompactionCheckpoint,
15+
readSessionLeafIdFromTranscriptAsync,
1516
} from "./session-compaction-checkpoints.js";
1617

1718
const tempDirs: string[] = [];
@@ -143,6 +144,56 @@ describe("session-compaction-checkpoints", () => {
143144
}
144145
});
145146

147+
test("async capture derives session metadata without synchronous SessionManager.open", async () => {
148+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-metadata-"));
149+
tempDirs.push(dir);
150+
151+
const session = SessionManager.create(dir, dir);
152+
session.appendMessage({
153+
role: "user",
154+
content: "derive checkpoint metadata",
155+
timestamp: Date.now(),
156+
});
157+
session.appendMessage({
158+
role: "assistant",
159+
content: "metadata derived",
160+
api: "responses",
161+
provider: "openai",
162+
model: "gpt-test",
163+
timestamp: Date.now(),
164+
} as unknown as AssistantMessage);
165+
166+
const sessionFile = session.getSessionFile();
167+
const sessionId = session.getSessionId();
168+
const leafId = session.getLeafId();
169+
expect(sessionFile).toBeTruthy();
170+
expect(sessionId).toBeTruthy();
171+
expect(leafId).toBeTruthy();
172+
await fs.appendFile(sessionFile!, "\nnot-json\n", "utf-8");
173+
174+
const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync");
175+
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
176+
let snapshot: Awaited<ReturnType<typeof captureCompactionCheckpointSnapshotAsync>> = null;
177+
try {
178+
expect(await readSessionLeafIdFromTranscriptAsync(sessionFile!)).toBe(leafId);
179+
snapshot = await captureCompactionCheckpointSnapshotAsync({
180+
sessionFile: sessionFile!,
181+
});
182+
183+
expect(copyFileSyncSpy).not.toHaveBeenCalled();
184+
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
185+
expect(snapshot).not.toBeNull();
186+
expect(snapshot?.sessionId).toBe(sessionId);
187+
expect(snapshot?.leafId).toBe(leafId);
188+
expect(snapshot?.sessionFile).not.toBe(sessionFile);
189+
expect(snapshot?.sessionFile).toContain(".checkpoint.");
190+
} finally {
191+
await cleanupCompactionCheckpointSnapshot(snapshot);
192+
copyFileSyncSpy.mockRestore();
193+
sessionManagerOpenSpy.mockRestore();
194+
}
195+
});
196+
146197
test("async capture skips oversized pre-compaction transcripts without sync copy", async () => {
147198
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-oversized-"));
148199
tempDirs.push(dir);

src/gateway/session-compaction-checkpoints.ts

Lines changed: 108 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,36 @@ export function resolveSessionCompactionCheckpointReason(params: {
6161
}
6262

6363
const SESSION_HEADER_READ_MAX_BYTES = 64 * 1024;
64+
const SESSION_TAIL_READ_INITIAL_BYTES = 64 * 1024;
65+
66+
type AsyncTranscriptFileHandle = Awaited<ReturnType<typeof fs.open>>;
67+
68+
async function readFileRangeAsync(
69+
fileHandle: AsyncTranscriptFileHandle,
70+
position: number,
71+
length: number,
72+
): Promise<Buffer> {
73+
const buffer = Buffer.alloc(length);
74+
let offset = 0;
75+
while (offset < length) {
76+
const { bytesRead } = await fileHandle.read(buffer, offset, length - offset, position + offset);
77+
if (bytesRead <= 0) {
78+
break;
79+
}
80+
offset += bytesRead;
81+
}
82+
return offset === length ? buffer : buffer.subarray(0, offset);
83+
}
6484

6585
async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise<string | null> {
66-
let fileHandle: Awaited<ReturnType<typeof fs.open>> | undefined;
86+
let fileHandle: AsyncTranscriptFileHandle | undefined;
6787
try {
6888
fileHandle = await fs.open(sessionFile, "r");
69-
const buffer = Buffer.alloc(SESSION_HEADER_READ_MAX_BYTES);
70-
const { bytesRead } = await fileHandle.read(buffer, 0, buffer.length, 0);
71-
if (bytesRead <= 0) {
89+
const buffer = await readFileRangeAsync(fileHandle, 0, SESSION_HEADER_READ_MAX_BYTES);
90+
if (buffer.length <= 0) {
7291
return null;
7392
}
74-
const chunk = buffer.toString("utf-8", 0, bytesRead);
93+
const chunk = buffer.toString("utf-8");
7594
const firstLine = chunk
7695
.split(/\r?\n/)
7796
.map((line) => line.trim())
@@ -92,6 +111,82 @@ async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Prom
92111
}
93112
}
94113

114+
function parseTranscriptLineId(
115+
line: string,
116+
): { kind: "session" } | { kind: "entry"; id: string } | null {
117+
try {
118+
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown };
119+
if (parsed.type === "session") {
120+
return { kind: "session" };
121+
}
122+
if (typeof parsed.id === "string" && parsed.id.trim()) {
123+
return { kind: "entry", id: parsed.id.trim() };
124+
}
125+
} catch {
126+
return null;
127+
}
128+
return null;
129+
}
130+
131+
export async function readSessionLeafIdFromTranscriptAsync(
132+
sessionFile: string,
133+
maxBytes = MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
134+
): Promise<string | null> {
135+
let fileHandle: AsyncTranscriptFileHandle | undefined;
136+
try {
137+
fileHandle = await fs.open(sessionFile, "r");
138+
const stat = await fileHandle.stat();
139+
if (!stat.isFile() || stat.size <= 0) {
140+
return null;
141+
}
142+
143+
const requestedMaxBytes = Number.isFinite(maxBytes)
144+
? Math.max(1024, Math.floor(maxBytes))
145+
: MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
146+
const maxReadableBytes = Math.min(stat.size, requestedMaxBytes);
147+
let readLength = Math.min(maxReadableBytes, SESSION_TAIL_READ_INITIAL_BYTES);
148+
while (readLength > 0) {
149+
const readStart = Math.max(0, stat.size - readLength);
150+
const buffer = await readFileRangeAsync(fileHandle, readStart, readLength);
151+
const lines = buffer.toString("utf-8").split(/\r?\n/);
152+
// If we did not read from the beginning, the first line may be a suffix of
153+
// a larger JSONL entry. Ignore it and grow the window if no complete entry
154+
// is found.
155+
const candidateLines = readStart > 0 ? lines.slice(1) : lines;
156+
for (let i = candidateLines.length - 1; i >= 0; i -= 1) {
157+
const line = candidateLines[i]?.trim();
158+
if (!line) {
159+
continue;
160+
}
161+
const parsed = parseTranscriptLineId(line);
162+
if (!parsed) {
163+
continue;
164+
}
165+
if (parsed.kind === "session") {
166+
return null;
167+
}
168+
return parsed.id;
169+
}
170+
171+
if (readStart === 0) {
172+
return null;
173+
}
174+
const nextReadLength = Math.min(maxReadableBytes, readLength * 2);
175+
if (nextReadLength === readLength) {
176+
return null;
177+
}
178+
readLength = nextReadLength;
179+
}
180+
} catch {
181+
return null;
182+
} finally {
183+
if (fileHandle) {
184+
await fileHandle.close().catch(() => undefined);
185+
}
186+
}
187+
return null;
188+
}
189+
95190
/**
96191
* Synchronous version — kept for callers that cannot be made async.
97192
* Prefer captureCompactionCheckpointSnapshotAsync for large transcripts
@@ -165,7 +260,7 @@ export function captureCompactionCheckpointSnapshot(params: {
165260
* (see issue #75414).
166261
*/
167262
export async function captureCompactionCheckpointSnapshotAsync(params: {
168-
sessionManager: Pick<SessionManager, "getLeafId">;
263+
sessionManager?: Pick<SessionManager, "getLeafId">;
169264
sessionFile: string;
170265
maxBytes?: number;
171266
}): Promise<CapturedCompactionCheckpointSnapshot | null> {
@@ -174,7 +269,11 @@ export async function captureCompactionCheckpointSnapshotAsync(params: {
174269
? params.sessionManager.getLeafId.bind(params.sessionManager)
175270
: null;
176271
const sessionFile = params.sessionFile.trim();
177-
if (!getLeafId || !sessionFile) {
272+
if (!sessionFile || (params.sessionManager && !getLeafId)) {
273+
return null;
274+
}
275+
const liveLeafId = getLeafId ? getLeafId() : undefined;
276+
if (getLeafId && !liveLeafId) {
178277
return null;
179278
}
180279
const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
@@ -186,10 +285,6 @@ export async function captureCompactionCheckpointSnapshotAsync(params: {
186285
} catch {
187286
return null;
188287
}
189-
const leafId = getLeafId();
190-
if (!leafId) {
191-
return null;
192-
}
193288
const parsedSessionFile = path.parse(sessionFile);
194289
const snapshotFile = path.join(
195290
parsedSessionFile.dir,
@@ -201,7 +296,8 @@ export async function captureCompactionCheckpointSnapshotAsync(params: {
201296
return null;
202297
}
203298
const sessionId = await readSessionIdFromTranscriptHeaderAsync(snapshotFile);
204-
if (!sessionId) {
299+
const leafId = liveLeafId ?? (await readSessionLeafIdFromTranscriptAsync(snapshotFile, maxBytes));
300+
if (!sessionId || !leafId) {
205301
try {
206302
await fs.unlink(snapshotFile);
207303
} catch {

src/gateway/session-utils.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,11 +1874,7 @@ export async function listSessionsFromStoreAsync(params: {
18741874
);
18751875
// Yield to the event loop between batches so WebSocket heartbeats,
18761876
// channel I/O, and concurrent RPC calls are not starved.
1877-
if (
1878-
includeTranscriptFields &&
1879-
(i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 &&
1880-
i + 1 < entries.length
1881-
) {
1877+
if ((i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 && i + 1 < entries.length) {
18821878
await new Promise<void>((resolve) => setImmediate(resolve));
18831879
}
18841880
}

0 commit comments

Comments
 (0)