Skip to content

Commit 9a3a341

Browse files
committed
refactor(channels): route remaining turns through kernel
1 parent 9a9cd0c commit 9a3a341

19 files changed

Lines changed: 1204 additions & 621 deletions

File tree

extensions/bluebubbles/src/monitor-processing.ts

Lines changed: 161 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,177 +1714,192 @@ async function processMessageAfterDedupe(
17141714
},
17151715
},
17161716
});
1717-
await core.channel.turn.dispatchAssembled({
1718-
cfg: config,
1717+
await core.channel.turn.run({
17191718
channel: "bluebubbles",
17201719
accountId: account.accountId,
1721-
agentId: route.agentId,
1722-
routeSessionKey: route.sessionKey,
1723-
storePath,
1724-
ctxPayload,
1725-
recordInboundSession: core.channel.session.recordInboundSession,
1726-
dispatchReplyWithBufferedBlockDispatcher:
1727-
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
1728-
delivery: {
1729-
deliver: async (payload, info) => {
1730-
const rawReplyToId =
1731-
privateApiEnabled && typeof payload.replyToId === "string"
1732-
? payload.replyToId.trim()
1733-
: "";
1734-
// Resolve short ID (e.g., "5") to full UUID, scoped to the chat
1735-
// this deliver path is already routing for (cross-chat guard).
1736-
const replyToMessageGuid = rawReplyToId
1737-
? resolveBlueBubblesMessageId(rawReplyToId, {
1738-
requireKnownShortId: true,
1739-
chatContext: {
1740-
chatGuid: chatGuidForActions ?? chatGuid,
1741-
chatIdentifier,
1742-
chatId,
1743-
},
1744-
})
1745-
: "";
1746-
const mediaList = resolveOutboundMediaUrls(payload);
1747-
if (mediaList.length > 0) {
1748-
const tableMode = core.channel.text.resolveMarkdownTableMode({
1749-
cfg: config,
1750-
channel: "bluebubbles",
1751-
accountId: account.accountId,
1752-
});
1753-
const text = sanitizeReplyDirectiveText(
1754-
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
1755-
);
1756-
await sendMediaWithLeadingCaption({
1757-
mediaUrls: mediaList,
1758-
caption: text,
1759-
send: async ({ mediaUrl, caption }) => {
1760-
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
1720+
raw: ctxPayload,
1721+
adapter: {
1722+
ingest: () => ({
1723+
id: String(ctxPayload.MessageSid ?? message.messageId),
1724+
timestamp: message.timestamp,
1725+
rawText: rawBody,
1726+
textForAgent: rawBody,
1727+
textForCommands: commandBody,
1728+
raw: ctxPayload,
1729+
}),
1730+
resolveTurn: () => ({
1731+
cfg: config,
1732+
channel: "bluebubbles",
1733+
accountId: account.accountId,
1734+
agentId: route.agentId,
1735+
routeSessionKey: route.sessionKey,
1736+
storePath,
1737+
ctxPayload,
1738+
recordInboundSession: core.channel.session.recordInboundSession,
1739+
dispatchReplyWithBufferedBlockDispatcher:
1740+
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
1741+
delivery: {
1742+
deliver: async (payload, info) => {
1743+
const rawReplyToId =
1744+
privateApiEnabled && typeof payload.replyToId === "string"
1745+
? payload.replyToId.trim()
1746+
: "";
1747+
// Resolve short ID (e.g., "5") to full UUID, scoped to the chat
1748+
// this deliver path is already routing for (cross-chat guard).
1749+
const replyToMessageGuid = rawReplyToId
1750+
? resolveBlueBubblesMessageId(rawReplyToId, {
1751+
requireKnownShortId: true,
1752+
chatContext: {
1753+
chatGuid: chatGuidForActions ?? chatGuid,
1754+
chatIdentifier,
1755+
chatId,
1756+
},
1757+
})
1758+
: "";
1759+
const mediaList = resolveOutboundMediaUrls(payload);
1760+
if (mediaList.length > 0) {
1761+
const tableMode = core.channel.text.resolveMarkdownTableMode({
1762+
cfg: config,
1763+
channel: "bluebubbles",
1764+
accountId: account.accountId,
1765+
});
1766+
const text = sanitizeReplyDirectiveText(
1767+
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
1768+
);
1769+
await sendMediaWithLeadingCaption({
1770+
mediaUrls: mediaList,
1771+
caption: text,
1772+
send: async ({ mediaUrl, caption }) => {
1773+
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
1774+
const pendingId = rememberPendingOutboundMessageId({
1775+
accountId: account.accountId,
1776+
sessionKey: route.sessionKey,
1777+
outboundTarget,
1778+
chatGuid: chatGuidForActions ?? chatGuid,
1779+
chatIdentifier,
1780+
chatId,
1781+
snippet: cachedBody,
1782+
});
1783+
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
1784+
try {
1785+
result = await sendBlueBubblesMedia({
1786+
cfg: config,
1787+
to: outboundTarget,
1788+
mediaUrl,
1789+
caption: caption ?? undefined,
1790+
replyToId: replyToMessageGuid || null,
1791+
accountId: account.accountId,
1792+
asVoice: payload.audioAsVoice === true,
1793+
});
1794+
} catch (err) {
1795+
forgetPendingOutboundMessageId(pendingId);
1796+
throw err;
1797+
}
1798+
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
1799+
forgetPendingOutboundMessageId(pendingId);
1800+
}
1801+
sentMessage = true;
1802+
statusSink?.({ lastOutboundAt: Date.now() });
1803+
if (info.kind === "block") {
1804+
restartTypingSoon();
1805+
}
1806+
},
1807+
});
1808+
return;
1809+
}
1810+
1811+
const textLimit =
1812+
account.config.textChunkLimit && account.config.textChunkLimit > 0
1813+
? account.config.textChunkLimit
1814+
: DEFAULT_TEXT_LIMIT;
1815+
const chunkMode = account.config.chunkMode ?? "length";
1816+
const tableMode = core.channel.text.resolveMarkdownTableMode({
1817+
cfg: config,
1818+
channel: "bluebubbles",
1819+
accountId: account.accountId,
1820+
});
1821+
const text = sanitizeReplyDirectiveText(
1822+
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
1823+
);
1824+
const chunks =
1825+
chunkMode === "newline"
1826+
? resolveTextChunksWithFallback(
1827+
text,
1828+
core.channel.text.chunkTextWithMode(text, textLimit, chunkMode),
1829+
)
1830+
: resolveTextChunksWithFallback(
1831+
text,
1832+
core.channel.text.chunkMarkdownText(text, textLimit),
1833+
);
1834+
if (!chunks.length) {
1835+
return;
1836+
}
1837+
for (const chunk of chunks) {
17611838
const pendingId = rememberPendingOutboundMessageId({
17621839
accountId: account.accountId,
17631840
sessionKey: route.sessionKey,
17641841
outboundTarget,
17651842
chatGuid: chatGuidForActions ?? chatGuid,
17661843
chatIdentifier,
17671844
chatId,
1768-
snippet: cachedBody,
1845+
snippet: chunk,
17691846
});
1770-
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
1847+
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
17711848
try {
1772-
result = await sendBlueBubblesMedia({
1849+
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
17731850
cfg: config,
1774-
to: outboundTarget,
1775-
mediaUrl,
1776-
caption: caption ?? undefined,
1777-
replyToId: replyToMessageGuid || null,
17781851
accountId: account.accountId,
1779-
asVoice: payload.audioAsVoice === true,
1852+
replyToMessageGuid: replyToMessageGuid || undefined,
17801853
});
17811854
} catch (err) {
17821855
forgetPendingOutboundMessageId(pendingId);
17831856
throw err;
17841857
}
1785-
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
1858+
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
17861859
forgetPendingOutboundMessageId(pendingId);
17871860
}
17881861
sentMessage = true;
17891862
statusSink?.({ lastOutboundAt: Date.now() });
17901863
if (info.kind === "block") {
17911864
restartTypingSoon();
17921865
}
1793-
},
1794-
});
1795-
return;
1796-
}
1797-
1798-
const textLimit =
1799-
account.config.textChunkLimit && account.config.textChunkLimit > 0
1800-
? account.config.textChunkLimit
1801-
: DEFAULT_TEXT_LIMIT;
1802-
const chunkMode = account.config.chunkMode ?? "length";
1803-
const tableMode = core.channel.text.resolveMarkdownTableMode({
1804-
cfg: config,
1805-
channel: "bluebubbles",
1806-
accountId: account.accountId,
1807-
});
1808-
const text = sanitizeReplyDirectiveText(
1809-
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
1810-
);
1811-
const chunks =
1812-
chunkMode === "newline"
1813-
? resolveTextChunksWithFallback(
1814-
text,
1815-
core.channel.text.chunkTextWithMode(text, textLimit, chunkMode),
1816-
)
1817-
: resolveTextChunksWithFallback(
1818-
text,
1819-
core.channel.text.chunkMarkdownText(text, textLimit),
1820-
);
1821-
if (!chunks.length) {
1822-
return;
1823-
}
1824-
for (const chunk of chunks) {
1825-
const pendingId = rememberPendingOutboundMessageId({
1826-
accountId: account.accountId,
1827-
sessionKey: route.sessionKey,
1828-
outboundTarget,
1829-
chatGuid: chatGuidForActions ?? chatGuid,
1830-
chatIdentifier,
1831-
chatId,
1832-
snippet: chunk,
1833-
});
1834-
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
1835-
try {
1836-
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
1837-
cfg: config,
1838-
accountId: account.accountId,
1839-
replyToMessageGuid: replyToMessageGuid || undefined,
1840-
});
1841-
} catch (err) {
1842-
forgetPendingOutboundMessageId(pendingId);
1843-
throw err;
1844-
}
1845-
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
1846-
forgetPendingOutboundMessageId(pendingId);
1847-
}
1848-
sentMessage = true;
1849-
statusSink?.({ lastOutboundAt: Date.now() });
1850-
if (info.kind === "block") {
1851-
restartTypingSoon();
1852-
}
1853-
}
1854-
},
1855-
onError: (err, info) => {
1856-
// Flag the outer dedupe wrapper so it releases the claim instead
1857-
// of committing. Without this, a transient BlueBubbles send failure
1858-
// would permanently block replay-retry for 7 days and the user
1859-
// would never receive a reply to that message.
1860-
//
1861-
// Only the terminal `final` delivery represents the user-visible
1862-
// answer. The dispatcher continues past `tool` / `block` failures
1863-
// and may still deliver `final` successfully — releasing the
1864-
// dedupe claim for those would invite a replay that re-runs tool
1865-
// side effects and resends partially-delivered content.
1866-
if (info.kind === "final") {
1867-
dedupeSignal.deliveryFailed = true;
1868-
}
1869-
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`);
1870-
},
1871-
},
1872-
dispatcherOptions: {
1873-
...replyPipeline,
1874-
onReplyStart: typingCallbacks?.onReplyStart,
1875-
onIdle: typingCallbacks?.onIdle,
1876-
},
1877-
replyOptions: {
1878-
onModelSelected,
1879-
disableBlockStreaming:
1880-
typeof account.config.blockStreaming === "boolean"
1881-
? !account.config.blockStreaming
1882-
: undefined,
1883-
},
1884-
record: {
1885-
onRecordError: (err) => {
1886-
runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`);
1887-
},
1866+
}
1867+
},
1868+
onError: (err, info) => {
1869+
// Flag the outer dedupe wrapper so it releases the claim instead
1870+
// of committing. Without this, a transient BlueBubbles send failure
1871+
// would permanently block replay-retry for 7 days and the user
1872+
// would never receive a reply to that message.
1873+
//
1874+
// Only the terminal `final` delivery represents the user-visible
1875+
// answer. The dispatcher continues past `tool` / `block` failures
1876+
// and may still deliver `final` successfully — releasing the
1877+
// dedupe claim for those would invite a replay that re-runs tool
1878+
// side effects and resends partially-delivered content.
1879+
if (info.kind === "final") {
1880+
dedupeSignal.deliveryFailed = true;
1881+
}
1882+
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`);
1883+
},
1884+
},
1885+
dispatcherOptions: {
1886+
...replyPipeline,
1887+
onReplyStart: typingCallbacks?.onReplyStart,
1888+
onIdle: typingCallbacks?.onIdle,
1889+
},
1890+
replyOptions: {
1891+
onModelSelected,
1892+
disableBlockStreaming:
1893+
typeof account.config.blockStreaming === "boolean"
1894+
? !account.config.blockStreaming
1895+
: undefined,
1896+
},
1897+
record: {
1898+
onRecordError: (err) => {
1899+
runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`);
1900+
},
1901+
},
1902+
}),
18881903
},
18891904
});
18901905
} finally {

0 commit comments

Comments
 (0)