-
Notifications
You must be signed in to change notification settings - Fork 0
Implement ChannelBridge orchestrator #32
Description
Summary
Implement ChannelBridge — the central orchestrator that connects incoming channel messages to CLI agent execution and delivery. This is the keystone C1 middleware component that wires together all previously implemented pieces: runtime factory, session map, error classifier, delivery adapter, system prompt builder, and MCP side effects.
Architecture
ChannelMessage (from channel adapter)
|
v
ChannelBridge.handle(message, callbacks?, abortSignal?)
|
├── 1. Session lookup (SessionMap)
├── 2. System prompt construction (buildSystemPrompt)
├── 3. MCP config assembly (McpServerConfig for runtime)
├── 4. Runtime params construction (AgentExecuteParams)
├── 5. Runtime execution (createCliRuntime → execute())
├── 6. Event streaming (DeliveryAdapter + BridgeCallbacks)
├── 7. Error classification (ErrorClassifier)
├── 8. MCP side effects reading (readMcpSideEffects)
├── 9. Session update (SessionMap)
└── 10. Result assembly (AgentDeliveryResult)
|
v
AgentDeliveryResult (to delivery pipeline)
Single Entry Point
ChannelBridge.handle() is the single entry point for all dispatch sites. All 4 core dispatch sites (agent command, auto-reply, cron, follow-up) and the voice-call extension dispatch site will call this method instead of runEmbeddedPiAgent().
Dual Delivery Model
- During execution: Streaming callbacks (
onPartialReply,onBlockReply,onToolResult) deliver text to the channel in real-time as the CLI agent generates output - After execution: Final
AgentDeliveryResult.payloadsdeliver complete text blocks (for non-streaming fallback, errors, or media)
Dependencies (all implemented)
| Component | Ref | Module | Used For |
|---|---|---|---|
| Runtime factory | #19 | runtime-factory.ts |
createCliRuntime(provider) |
| Error classifier | #22 | error-classifier.ts |
classifyError(stderr) for retry/fail decisions |
| Session map | #24 | session-map.ts |
Session lookup/update by composite key |
| Delivery adapter | #26 | delivery-adapter.ts |
AgentEvent stream → ReplyPayload[] chunks |
| System prompt | #28 | system-prompt.ts |
buildSystemPrompt(params) |
| MCP side effects | #30 | mcp-side-effects.ts |
readMcpSideEffects(filePath) after CLI exit |
| Types | PR #4 | types.ts |
AgentDeliveryResult, ChannelMessage, BridgeCallbacks, etc. |
handle() Flow
async handle(
message: ChannelMessage,
callbacks?: BridgeCallbacks,
abortSignal?: AbortSignal,
): Promise<AgentDeliveryResult> {
// 1. Session lookup
const sessionKey = buildSessionKey(message);
const existingSessionId = this.sessionMap.get(sessionKey);
// 2. System prompt
const systemPrompt = buildSystemPrompt({ ... });
// 3. MCP config: build McpServerConfig with gateway env vars
const invocationDir = await mkdtemp(join(tmpdir(), "rc-"));
const sideEffectsFile = join(invocationDir, "side-effects.ndjson");
const mcpServers = this.buildMcpConfig(message, sideEffectsFile);
// 4. Runtime params
const runtime = createCliRuntime(this.provider);
const params: AgentExecuteParams = {
prompt: systemPrompt + "\n\n" + message.text,
sessionId: existingSessionId,
mcpServers,
abortSignal,
workingDirectory: this.workspaceDir,
env: this.buildRuntimeEnv(),
};
// 5-6. Execute + stream events through DeliveryAdapter
const adapter = new DeliveryAdapter({ chunkLimit: this.chunkLimit });
let runResult: AgentRunResult | undefined;
let lastError: string | undefined;
try {
const { payloads, result, error } = await adapter.process(
runtime.execute(params),
callbacks,
);
runResult = result;
lastError = error;
} catch (err) {
// 7. Error classification
const category = classifyError(String(err));
lastError = String(err);
// Handle based on category (retryable, fatal, context_overflow)
}
// 8. Read MCP side effects
const mcp = await readMcpSideEffects(sideEffectsFile);
// 9. Session update
if (runResult?.sessionId) {
this.sessionMap.set(sessionKey, runResult.sessionId);
}
// 10. Cleanup and return
await rm(invocationDir, { recursive: true, force: true });
const text = (runResult?.text ?? "").trim();
return {
payloads: text ? [{ text }] : [],
run: runResult ?? DEFAULT_RUN_RESULT,
mcp,
error: lastError,
};
}Constructor / Configuration
export class ChannelBridge {
constructor(options: {
/** CLI runtime provider ("claude", "gemini", "codex", "opencode"). */
provider: string;
/** Session map for session persistence. */
sessionMap: SessionMap;
/** Gateway URL for MCP server WebSocket connection. */
gatewayUrl: string;
/** Gateway auth token for MCP server. */
gatewayToken: string;
/** Working directory for CLI subprocess. */
workspaceDir?: string;
/** Channel text chunk limit (default: 4000). */
chunkLimit?: number;
/** MCP server entry point path. */
mcpServerPath?: string;
});
}MCP Config Assembly
ChannelBridge builds a McpServerConfig for the remoteclaw MCP server and passes it via AgentExecuteParams.mcpServers. Each runtime already handles format translation internally (Claude: inline JSON flag, Gemini: settings.json merge, Codex: TOML config, OpenCode: JSON config).
private buildMcpConfig(
message: ChannelMessage,
sideEffectsFile: string,
): Record<string, McpServerConfig> {
return {
remoteclaw: {
command: "node",
args: [this.mcpServerPath],
env: {
REMOTECLAW_GATEWAY_URL: this.gatewayUrl,
REMOTECLAW_GATEWAY_TOKEN: this.gatewayToken,
REMOTECLAW_SESSION_KEY: buildSessionKey(message),
REMOTECLAW_SIDE_EFFECTS_FILE: sideEffectsFile,
REMOTECLAW_CHANNEL: message.provider,
REMOTECLAW_ACCOUNT_ID: message.from,
REMOTECLAW_TO: message.channelId,
...(message.replyToId ? { REMOTECLAW_THREAD_ID: message.replyToId } : {}),
},
},
};
}Session Key Construction
Composite key encoding channel + user + thread context:
function buildSessionKey(message: ChannelMessage): string {
const threadPart = message.replyToId ?? "_";
return `${message.channelId}:${message.from}:${threadPart}`;
}This matches the SessionMap key format from SessionMap (#24).
Error Handling Strategy
| Error Category | Action |
|---|---|
retryable |
Log, set errorSubtype on result, let caller decide retry |
fatal |
Set error on result, no retry |
context_overflow |
Set errorSubtype: "context_window", caller may start new session |
timeout / aborted |
Already handled by CLIRuntimeBase watchdog/abort |
ChannelBridge does NOT implement retry loops — it reports the error category to the caller (dispatch site). The dispatch site owns retry policy.
Implementation
New Files
| File | Purpose | Est. Lines |
|---|---|---|
src/middleware/channel-bridge.ts |
ChannelBridge class | ~200 |
Test Files
| File | Coverage |
|---|---|
src/middleware/channel-bridge.test.ts |
Constructor, handle() flow, session lifecycle, MCP config, error paths |
Estimated total: ~200 LoC + ~250 LoC tests
Key Design Decisions
-
No retry loops in ChannelBridge: Error classification is reported, not acted upon. Dispatch sites own retry policy (different for cron vs interactive).
-
Temp directory per invocation: Each
handle()call creates a temp dir for the MCP side effects file. Cleaned up infinally. -
Runtime created per call:
createCliRuntime()is called eachhandle(). Runtimes are lightweight (no state between invocations). -
DeliveryAdapter owns event-to-payload conversion: ChannelBridge delegates all text chunking and streaming to DeliveryAdapter. It only assembles the final
AgentDeliveryResult. -
Session key = channel+user+thread: Matches SessionMap's composite key design from SessionMap (Implement SessionMap #24).
Acceptance Criteria
-
src/middleware/channel-bridge.tsorchestrates the full message → execution → delivery flow - Streaming callbacks (
onPartialReply,onBlockReply,onToolResult) are invoked during execution - MCP side effects are read via
readMcpSideEffects()after CLI exit (file-based NDJSON per side effects design analysis) - Errors from ErrorClassifier set appropriate
errorSubtypeon the result - The
handle()method is the single entry point for all dispatch sites - Session lookup and update via SessionMap work correctly
- MCP config is assembled with correct env vars for the remoteclaw MCP server
- Temp invocation directory is created and cleaned up per call
- Unit tests verify the orchestration flow with mocked runtime
-
pnpm buildpasses