@@ -37,9 +37,8 @@ import {
3737 resolveMemoryFlushPromptForRun ,
3838 resolveMemoryFlushSettings ,
3939 shouldRunMemoryFlush ,
40- computeContextHash ,
4140} from "./memory-flush.js" ;
42- import type { FollowupRun } from "./queue.js" ;
41+ import { refreshQueuedFollowupSession , type FollowupRun } from "./queue.js" ;
4342import { incrementCompactionCount } from "./session-updates.js" ;
4443
4544export function estimatePromptTokensForMemoryFlush ( prompt ?: string ) : number | undefined {
@@ -448,47 +447,6 @@ export async function runMemoryFlushIfNeeded(params: {
448447 return entry ?? params . sessionEntry ;
449448 }
450449
451- // --- Content hash dedup (state-based) ---
452- // Read the tail of the session transcript and compute a lightweight hash.
453- // If the hash matches the last flush, the context hasn't materially changed
454- // and flushing again would produce duplicate memory entries (#30115).
455- const sessionFilePath = await resolveSessionFilePathForFlush (
456- params . followupRun . run . sessionId ,
457- entry ?? params . sessionEntry ,
458- params . storePath ,
459- params . sessionKey ? resolveAgentIdFromSessionKey ( params . sessionKey ) : undefined ,
460- ) ;
461- let contextHashBeforeFlush : string | undefined ;
462- if ( sessionFilePath ) {
463- try {
464- const tailMessages = await readTranscriptTailMessages ( sessionFilePath , 10 ) ;
465- // Include the pending prompt in the hash — runMemoryFlushIfNeeded runs
466- // before the current prompt is appended to the transcript, so the
467- // persisted tail alone would match the post-flush hash and incorrectly
468- // skip the next flush even when a new user message arrived.
469- const currentPrompt = params . followupRun . prompt ;
470- if ( currentPrompt ) {
471- tailMessages . push ( { role : "user" , content : currentPrompt } ) ;
472- }
473- if ( tailMessages . length === 0 ) {
474- logVerbose (
475- `memoryFlush dedup skipped (no tail messages extracted): sessionKey=${ params . sessionKey } ` ,
476- ) ;
477- }
478- contextHashBeforeFlush =
479- tailMessages . length > 0 ? computeContextHash ( tailMessages ) : undefined ;
480- const previousHash = entry ?. memoryFlushContextHash ;
481- if ( previousHash && contextHashBeforeFlush === previousHash ) {
482- logVerbose (
483- `memoryFlush skipped (context hash unchanged): sessionKey=${ params . sessionKey } hash=${ contextHashBeforeFlush } ` ,
484- ) ;
485- return entry ?? params . sessionEntry ;
486- }
487- } catch ( err ) {
488- logVerbose ( `memoryFlush hash check failed, proceeding with flush: ${ String ( err ) } ` ) ;
489- }
490- }
491-
492450 logVerbose (
493451 `memoryFlush triggered: sessionKey=${ params . sessionKey } tokenCount=${ tokenCountForFlush ?? "undefined" } threshold=${ flushThreshold } ` ,
494452 ) ;
@@ -507,7 +465,6 @@ export async function runMemoryFlushIfNeeded(params: {
507465 } ) ;
508466 }
509467 let memoryCompactionCompleted = false ;
510- let fallbackFlushAttemptedForCurrentHash = false ;
511468 const memoryFlushNowMs = Date . now ( ) ;
512469 const memoryFlushWritePath = resolveMemoryFlushRelativePathForRun ( {
513470 cfg : params . cfg ,
@@ -519,21 +476,12 @@ export async function runMemoryFlushIfNeeded(params: {
519476 ]
520477 . filter ( Boolean )
521478 . join ( "\n\n" ) ;
479+ let postCompactionSessionId : string | undefined ;
522480 try {
523481 await runWithModelFallback ( {
524482 ...resolveModelFallbackOptions ( params . followupRun . run ) ,
525483 runId : flushRunId ,
526484 run : async ( provider , model , runOptions ) => {
527- if ( contextHashBeforeFlush && fallbackFlushAttemptedForCurrentHash ) {
528- logVerbose (
529- `memoryFlush fallback candidate skipped (context hash already attempted): sessionKey=${ params . sessionKey } hash=${ contextHashBeforeFlush } provider=${ provider } model=${ model } ` ,
530- ) ;
531- // A prior candidate already attempted this exact flush context. Be
532- // conservative and skip later candidates so a write-then-throw failure
533- // cannot append the same memory twice during a single fallback cycle.
534- return { payloads : [ ] , meta : { } } ;
535- }
536- fallbackFlushAttemptedForCurrentHash = Boolean ( contextHashBeforeFlush ) ;
537485 const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams ( {
538486 run : params . followupRun . run ,
539487 sessionCtx : params . sessionCtx ,
@@ -562,12 +510,15 @@ export async function runMemoryFlushIfNeeded(params: {
562510 onAgentEvent : ( evt ) => {
563511 if ( evt . stream === "compaction" ) {
564512 const phase = typeof evt . data . phase === "string" ? evt . data . phase : "" ;
565- if ( phase === "end" && evt . data . completed === true ) {
513+ if ( phase === "end" ) {
566514 memoryCompactionCompleted = true ;
567515 }
568516 }
569517 } ,
570518 } ) ;
519+ if ( result . meta ?. agentMeta ?. sessionId ) {
520+ postCompactionSessionId = result . meta . agentMeta . sessionId ;
521+ }
571522 bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen (
572523 result . meta ?. systemPromptReport ,
573524 ) ;
@@ -579,45 +530,51 @@ export async function runMemoryFlushIfNeeded(params: {
579530 ( params . sessionKey ? activeSessionStore ?. [ params . sessionKey ] ?. compactionCount : 0 ) ??
580531 0 ;
581532 if ( memoryCompactionCompleted ) {
533+ const previousSessionId = activeSessionEntry ?. sessionId ?? params . followupRun . run . sessionId ;
582534 const nextCount = await incrementCompactionCount ( {
583535 sessionEntry : activeSessionEntry ,
584536 sessionStore : activeSessionStore ,
585537 sessionKey : params . sessionKey ,
586538 storePath : params . storePath ,
539+ newSessionId : postCompactionSessionId ,
587540 } ) ;
541+ const updatedEntry = params . sessionKey ? activeSessionStore ?. [ params . sessionKey ] : undefined ;
542+ if ( updatedEntry ) {
543+ activeSessionEntry = updatedEntry ;
544+ params . followupRun . run . sessionId = updatedEntry . sessionId ;
545+ if ( updatedEntry . sessionFile ) {
546+ params . followupRun . run . sessionFile = updatedEntry . sessionFile ;
547+ }
548+ const queueKey = params . followupRun . run . sessionKey ?? params . sessionKey ;
549+ if ( queueKey ) {
550+ refreshQueuedFollowupSession ( {
551+ key : queueKey ,
552+ previousSessionId,
553+ nextSessionId : updatedEntry . sessionId ,
554+ nextSessionFile : updatedEntry . sessionFile ,
555+ } ) ;
556+ }
557+ }
588558 if ( typeof nextCount === "number" ) {
589559 memoryFlushCompactionCount = nextCount ;
590560 }
591561 }
592562 if ( params . storePath && params . sessionKey ) {
593563 try {
594- // Re-hash the transcript AFTER the flush so the stored hash matches
595- // what the next pre-flush check will compute (the transcript now
596- // includes the flush turn's messages). (#34222)
597- let contextHashAfterFlush = contextHashBeforeFlush ;
598- if ( sessionFilePath ) {
599- try {
600- const postFlushMessages = await readTranscriptTailMessages ( sessionFilePath , 10 ) ;
601- if ( postFlushMessages . length > 0 ) {
602- contextHashAfterFlush = computeContextHash ( postFlushMessages ) ;
603- }
604- } catch {
605- // Best-effort: fall back to pre-flush hash if re-read fails.
606- }
607- }
608564 const updatedEntry = await updateSessionStoreEntry ( {
609565 storePath : params . storePath ,
610566 sessionKey : params . sessionKey ,
611567 update : async ( ) => ( {
612568 memoryFlushAt : Date . now ( ) ,
613569 memoryFlushCompactionCount,
614- // Always write the hash field — when rehashing fails, clearing
615- // the stale value prevents incorrect dedup on subsequent flushes.
616- memoryFlushContextHash : contextHashAfterFlush ?? undefined ,
617570 } ) ,
618571 } ) ;
619572 if ( updatedEntry ) {
620573 activeSessionEntry = updatedEntry ;
574+ params . followupRun . run . sessionId = updatedEntry . sessionId ;
575+ if ( updatedEntry . sessionFile ) {
576+ params . followupRun . run . sessionFile = updatedEntry . sessionFile ;
577+ }
621578 }
622579 } catch ( err ) {
623580 logVerbose ( `failed to persist memory flush metadata: ${ String ( err ) } ` ) ;
@@ -629,64 +586,3 @@ export async function runMemoryFlushIfNeeded(params: {
629586
630587 return activeSessionEntry ;
631588}
632-
633- /**
634- * Resolve the session transcript file path for flush hash computation.
635- */
636- async function resolveSessionFilePathForFlush (
637- sessionId : string | undefined ,
638- entry : SessionEntry | undefined ,
639- storePath : string | undefined ,
640- agentId : string | undefined ,
641- ) : Promise < string | undefined > {
642- if ( ! sessionId ) {
643- return undefined ;
644- }
645- const resolved = resolveSessionFilePath (
646- sessionId ,
647- entry ,
648- resolveSessionFilePathOptions ( { agentId, storePath } ) ,
649- ) ;
650- return resolved ?? undefined ;
651- }
652-
653- /**
654- * Read the last N messages from a session transcript file.
655- * Only reads the tail of the file to avoid loading multi-MB transcripts.
656- */
657- async function readTranscriptTailMessages (
658- filePath : string ,
659- maxMessages : number ,
660- ) : Promise < Array < { role ?: string ; content ?: unknown } > > {
661- const TAIL_BYTES = 64 * 1024 ;
662- const handle = await fs . promises . open ( filePath , "r" ) ;
663- try {
664- const stat = await handle . stat ( ) ;
665- const start = Math . max ( 0 , stat . size - TAIL_BYTES ) ;
666- const readLen = Math . min ( stat . size , TAIL_BYTES ) ;
667- const buf = Buffer . alloc ( readLen ) ;
668- await handle . read ( buf , 0 , readLen , start ) ;
669- const tail = buf . toString ( "utf-8" ) ;
670- const nlIdx = tail . indexOf ( "\n" ) ;
671- const trimmed = start > 0 ? ( nlIdx >= 0 ? tail . slice ( nlIdx + 1 ) : "" ) : tail ;
672- const lines = trimmed . split ( / \r ? \n / ) ;
673- const messages : Array < { role ?: string ; content ?: unknown } > = [ ] ;
674- for ( let i = lines . length - 1 ; i >= 0 && messages . length < maxMessages ; i -- ) {
675- const line = lines [ i ] . trim ( ) ;
676- if ( ! line ) {
677- continue ;
678- }
679- try {
680- const parsed = JSON . parse ( line ) ;
681- if ( parsed ?. message ?. role ) {
682- messages . unshift ( { role : parsed . message . role , content : parsed . message . content } ) ;
683- }
684- } catch {
685- // Skip malformed lines
686- }
687- }
688- return messages ;
689- } finally {
690- await handle . close ( ) ;
691- }
692- }
0 commit comments