@@ -1714,177 +1714,192 @@ async function processMessageAfterDedupe(
17141714 } ,
17151715 } ,
17161716 } ) ;
1717- await core . channel . turn . dispatchAssembled ( {
1718- cfg : config ,
1717+ await core . channel . turn . run ( {
17191718 channel : "bluebubbles" ,
17201719 accountId : account . accountId ,
1721- agentId : route . agentId ,
1722- routeSessionKey : route . sessionKey ,
1723- storePath,
1724- ctxPayload,
1725- recordInboundSession : core . channel . session . recordInboundSession ,
1726- dispatchReplyWithBufferedBlockDispatcher :
1727- core . channel . reply . dispatchReplyWithBufferedBlockDispatcher ,
1728- delivery : {
1729- deliver : async ( payload , info ) => {
1730- const rawReplyToId =
1731- privateApiEnabled && typeof payload . replyToId === "string"
1732- ? payload . replyToId . trim ( )
1733- : "" ;
1734- // Resolve short ID (e.g., "5") to full UUID, scoped to the chat
1735- // this deliver path is already routing for (cross-chat guard).
1736- const replyToMessageGuid = rawReplyToId
1737- ? resolveBlueBubblesMessageId ( rawReplyToId , {
1738- requireKnownShortId : true ,
1739- chatContext : {
1740- chatGuid : chatGuidForActions ?? chatGuid ,
1741- chatIdentifier,
1742- chatId,
1743- } ,
1744- } )
1745- : "" ;
1746- const mediaList = resolveOutboundMediaUrls ( payload ) ;
1747- if ( mediaList . length > 0 ) {
1748- const tableMode = core . channel . text . resolveMarkdownTableMode ( {
1749- cfg : config ,
1750- channel : "bluebubbles" ,
1751- accountId : account . accountId ,
1752- } ) ;
1753- const text = sanitizeReplyDirectiveText (
1754- core . channel . text . convertMarkdownTables ( payload . text ?? "" , tableMode ) ,
1755- ) ;
1756- await sendMediaWithLeadingCaption ( {
1757- mediaUrls : mediaList ,
1758- caption : text ,
1759- send : async ( { mediaUrl, caption } ) => {
1760- const cachedBody = ( caption ?? "" ) . trim ( ) || "<media:attachment>" ;
1720+ raw : ctxPayload ,
1721+ adapter : {
1722+ ingest : ( ) => ( {
1723+ id : String ( ctxPayload . MessageSid ?? message . messageId ) ,
1724+ timestamp : message . timestamp ,
1725+ rawText : rawBody ,
1726+ textForAgent : rawBody ,
1727+ textForCommands : commandBody ,
1728+ raw : ctxPayload ,
1729+ } ) ,
1730+ resolveTurn : ( ) => ( {
1731+ cfg : config ,
1732+ channel : "bluebubbles" ,
1733+ accountId : account . accountId ,
1734+ agentId : route . agentId ,
1735+ routeSessionKey : route . sessionKey ,
1736+ storePath,
1737+ ctxPayload,
1738+ recordInboundSession : core . channel . session . recordInboundSession ,
1739+ dispatchReplyWithBufferedBlockDispatcher :
1740+ core . channel . reply . dispatchReplyWithBufferedBlockDispatcher ,
1741+ delivery : {
1742+ deliver : async ( payload , info ) => {
1743+ const rawReplyToId =
1744+ privateApiEnabled && typeof payload . replyToId === "string"
1745+ ? payload . replyToId . trim ( )
1746+ : "" ;
1747+ // Resolve short ID (e.g., "5") to full UUID, scoped to the chat
1748+ // this deliver path is already routing for (cross-chat guard).
1749+ const replyToMessageGuid = rawReplyToId
1750+ ? resolveBlueBubblesMessageId ( rawReplyToId , {
1751+ requireKnownShortId : true ,
1752+ chatContext : {
1753+ chatGuid : chatGuidForActions ?? chatGuid ,
1754+ chatIdentifier,
1755+ chatId,
1756+ } ,
1757+ } )
1758+ : "" ;
1759+ const mediaList = resolveOutboundMediaUrls ( payload ) ;
1760+ if ( mediaList . length > 0 ) {
1761+ const tableMode = core . channel . text . resolveMarkdownTableMode ( {
1762+ cfg : config ,
1763+ channel : "bluebubbles" ,
1764+ accountId : account . accountId ,
1765+ } ) ;
1766+ const text = sanitizeReplyDirectiveText (
1767+ core . channel . text . convertMarkdownTables ( payload . text ?? "" , tableMode ) ,
1768+ ) ;
1769+ await sendMediaWithLeadingCaption ( {
1770+ mediaUrls : mediaList ,
1771+ caption : text ,
1772+ send : async ( { mediaUrl, caption } ) => {
1773+ const cachedBody = ( caption ?? "" ) . trim ( ) || "<media:attachment>" ;
1774+ const pendingId = rememberPendingOutboundMessageId ( {
1775+ accountId : account . accountId ,
1776+ sessionKey : route . sessionKey ,
1777+ outboundTarget,
1778+ chatGuid : chatGuidForActions ?? chatGuid ,
1779+ chatIdentifier,
1780+ chatId,
1781+ snippet : cachedBody ,
1782+ } ) ;
1783+ let result : Awaited < ReturnType < typeof sendBlueBubblesMedia > > ;
1784+ try {
1785+ result = await sendBlueBubblesMedia ( {
1786+ cfg : config ,
1787+ to : outboundTarget ,
1788+ mediaUrl,
1789+ caption : caption ?? undefined ,
1790+ replyToId : replyToMessageGuid || null ,
1791+ accountId : account . accountId ,
1792+ asVoice : payload . audioAsVoice === true ,
1793+ } ) ;
1794+ } catch ( err ) {
1795+ forgetPendingOutboundMessageId ( pendingId ) ;
1796+ throw err ;
1797+ }
1798+ if ( maybeEnqueueOutboundMessageId ( result . messageId , cachedBody ) ) {
1799+ forgetPendingOutboundMessageId ( pendingId ) ;
1800+ }
1801+ sentMessage = true ;
1802+ statusSink ?.( { lastOutboundAt : Date . now ( ) } ) ;
1803+ if ( info . kind === "block" ) {
1804+ restartTypingSoon ( ) ;
1805+ }
1806+ } ,
1807+ } ) ;
1808+ return ;
1809+ }
1810+
1811+ const textLimit =
1812+ account . config . textChunkLimit && account . config . textChunkLimit > 0
1813+ ? account . config . textChunkLimit
1814+ : DEFAULT_TEXT_LIMIT ;
1815+ const chunkMode = account . config . chunkMode ?? "length" ;
1816+ const tableMode = core . channel . text . resolveMarkdownTableMode ( {
1817+ cfg : config ,
1818+ channel : "bluebubbles" ,
1819+ accountId : account . accountId ,
1820+ } ) ;
1821+ const text = sanitizeReplyDirectiveText (
1822+ core . channel . text . convertMarkdownTables ( payload . text ?? "" , tableMode ) ,
1823+ ) ;
1824+ const chunks =
1825+ chunkMode === "newline"
1826+ ? resolveTextChunksWithFallback (
1827+ text ,
1828+ core . channel . text . chunkTextWithMode ( text , textLimit , chunkMode ) ,
1829+ )
1830+ : resolveTextChunksWithFallback (
1831+ text ,
1832+ core . channel . text . chunkMarkdownText ( text , textLimit ) ,
1833+ ) ;
1834+ if ( ! chunks . length ) {
1835+ return ;
1836+ }
1837+ for ( const chunk of chunks ) {
17611838 const pendingId = rememberPendingOutboundMessageId ( {
17621839 accountId : account . accountId ,
17631840 sessionKey : route . sessionKey ,
17641841 outboundTarget,
17651842 chatGuid : chatGuidForActions ?? chatGuid ,
17661843 chatIdentifier,
17671844 chatId,
1768- snippet : cachedBody ,
1845+ snippet : chunk ,
17691846 } ) ;
1770- let result : Awaited < ReturnType < typeof sendBlueBubblesMedia > > ;
1847+ let result : Awaited < ReturnType < typeof sendMessageBlueBubbles > > ;
17711848 try {
1772- result = await sendBlueBubblesMedia ( {
1849+ result = await sendMessageBlueBubbles ( outboundTarget , chunk , {
17731850 cfg : config ,
1774- to : outboundTarget ,
1775- mediaUrl,
1776- caption : caption ?? undefined ,
1777- replyToId : replyToMessageGuid || null ,
17781851 accountId : account . accountId ,
1779- asVoice : payload . audioAsVoice === true ,
1852+ replyToMessageGuid : replyToMessageGuid || undefined ,
17801853 } ) ;
17811854 } catch ( err ) {
17821855 forgetPendingOutboundMessageId ( pendingId ) ;
17831856 throw err ;
17841857 }
1785- if ( maybeEnqueueOutboundMessageId ( result . messageId , cachedBody ) ) {
1858+ if ( maybeEnqueueOutboundMessageId ( result . messageId , chunk ) ) {
17861859 forgetPendingOutboundMessageId ( pendingId ) ;
17871860 }
17881861 sentMessage = true ;
17891862 statusSink ?.( { lastOutboundAt : Date . now ( ) } ) ;
17901863 if ( info . kind === "block" ) {
17911864 restartTypingSoon ( ) ;
17921865 }
1793- } ,
1794- } ) ;
1795- return ;
1796- }
1797-
1798- const textLimit =
1799- account . config . textChunkLimit && account . config . textChunkLimit > 0
1800- ? account . config . textChunkLimit
1801- : DEFAULT_TEXT_LIMIT ;
1802- const chunkMode = account . config . chunkMode ?? "length" ;
1803- const tableMode = core . channel . text . resolveMarkdownTableMode ( {
1804- cfg : config ,
1805- channel : "bluebubbles" ,
1806- accountId : account . accountId ,
1807- } ) ;
1808- const text = sanitizeReplyDirectiveText (
1809- core . channel . text . convertMarkdownTables ( payload . text ?? "" , tableMode ) ,
1810- ) ;
1811- const chunks =
1812- chunkMode === "newline"
1813- ? resolveTextChunksWithFallback (
1814- text ,
1815- core . channel . text . chunkTextWithMode ( text , textLimit , chunkMode ) ,
1816- )
1817- : resolveTextChunksWithFallback (
1818- text ,
1819- core . channel . text . chunkMarkdownText ( text , textLimit ) ,
1820- ) ;
1821- if ( ! chunks . length ) {
1822- return ;
1823- }
1824- for ( const chunk of chunks ) {
1825- const pendingId = rememberPendingOutboundMessageId ( {
1826- accountId : account . accountId ,
1827- sessionKey : route . sessionKey ,
1828- outboundTarget,
1829- chatGuid : chatGuidForActions ?? chatGuid ,
1830- chatIdentifier,
1831- chatId,
1832- snippet : chunk ,
1833- } ) ;
1834- let result : Awaited < ReturnType < typeof sendMessageBlueBubbles > > ;
1835- try {
1836- result = await sendMessageBlueBubbles ( outboundTarget , chunk , {
1837- cfg : config ,
1838- accountId : account . accountId ,
1839- replyToMessageGuid : replyToMessageGuid || undefined ,
1840- } ) ;
1841- } catch ( err ) {
1842- forgetPendingOutboundMessageId ( pendingId ) ;
1843- throw err ;
1844- }
1845- if ( maybeEnqueueOutboundMessageId ( result . messageId , chunk ) ) {
1846- forgetPendingOutboundMessageId ( pendingId ) ;
1847- }
1848- sentMessage = true ;
1849- statusSink ?.( { lastOutboundAt : Date . now ( ) } ) ;
1850- if ( info . kind === "block" ) {
1851- restartTypingSoon ( ) ;
1852- }
1853- }
1854- } ,
1855- onError : ( err , info ) => {
1856- // Flag the outer dedupe wrapper so it releases the claim instead
1857- // of committing. Without this, a transient BlueBubbles send failure
1858- // would permanently block replay-retry for 7 days and the user
1859- // would never receive a reply to that message.
1860- //
1861- // Only the terminal `final` delivery represents the user-visible
1862- // answer. The dispatcher continues past `tool` / `block` failures
1863- // and may still deliver `final` successfully — releasing the
1864- // dedupe claim for those would invite a replay that re-runs tool
1865- // side effects and resends partially-delivered content.
1866- if ( info . kind === "final" ) {
1867- dedupeSignal . deliveryFailed = true ;
1868- }
1869- runtime . error ?.( `BlueBubbles ${ info . kind } reply failed: ${ sanitizeForLog ( err ) } ` ) ;
1870- } ,
1871- } ,
1872- dispatcherOptions : {
1873- ...replyPipeline ,
1874- onReplyStart : typingCallbacks ?. onReplyStart ,
1875- onIdle : typingCallbacks ?. onIdle ,
1876- } ,
1877- replyOptions : {
1878- onModelSelected,
1879- disableBlockStreaming :
1880- typeof account . config . blockStreaming === "boolean"
1881- ? ! account . config . blockStreaming
1882- : undefined ,
1883- } ,
1884- record : {
1885- onRecordError : ( err ) => {
1886- runtime . error ?.( `[bluebubbles] failed updating session meta: ${ sanitizeForLog ( err ) } ` ) ;
1887- } ,
1866+ }
1867+ } ,
1868+ onError : ( err , info ) => {
1869+ // Flag the outer dedupe wrapper so it releases the claim instead
1870+ // of committing. Without this, a transient BlueBubbles send failure
1871+ // would permanently block replay-retry for 7 days and the user
1872+ // would never receive a reply to that message.
1873+ //
1874+ // Only the terminal `final` delivery represents the user-visible
1875+ // answer. The dispatcher continues past `tool` / `block` failures
1876+ // and may still deliver `final` successfully — releasing the
1877+ // dedupe claim for those would invite a replay that re-runs tool
1878+ // side effects and resends partially-delivered content.
1879+ if ( info . kind === "final" ) {
1880+ dedupeSignal . deliveryFailed = true ;
1881+ }
1882+ runtime . error ?.( `BlueBubbles ${ info . kind } reply failed: ${ sanitizeForLog ( err ) } ` ) ;
1883+ } ,
1884+ } ,
1885+ dispatcherOptions : {
1886+ ...replyPipeline ,
1887+ onReplyStart : typingCallbacks ?. onReplyStart ,
1888+ onIdle : typingCallbacks ?. onIdle ,
1889+ } ,
1890+ replyOptions : {
1891+ onModelSelected,
1892+ disableBlockStreaming :
1893+ typeof account . config . blockStreaming === "boolean"
1894+ ? ! account . config . blockStreaming
1895+ : undefined ,
1896+ } ,
1897+ record : {
1898+ onRecordError : ( err ) => {
1899+ runtime . error ?.( `[bluebubbles] failed updating session meta: ${ sanitizeForLog ( err ) } ` ) ;
1900+ } ,
1901+ } ,
1902+ } ) ,
18881903 } ,
18891904 } ) ;
18901905 } finally {
0 commit comments