Skip to content

Implement ChannelBridge orchestrator #32

@alexey-pelykh

Description

@alexey-pelykh

Summary

Implement ChannelBridge — the central orchestrator that connects incoming channel messages to CLI agent execution and delivery. This is the keystone C1 middleware component that wires together all previously implemented pieces: runtime factory, session map, error classifier, delivery adapter, system prompt builder, and MCP side effects.

Architecture

ChannelMessage (from channel adapter)
    |
    v
ChannelBridge.handle(message, callbacks?, abortSignal?)
    |
    ├── 1. Session lookup (SessionMap)
    ├── 2. System prompt construction (buildSystemPrompt)
    ├── 3. MCP config assembly (McpServerConfig for runtime)
    ├── 4. Runtime params construction (AgentExecuteParams)
    ├── 5. Runtime execution (createCliRuntime → execute())
    ├── 6. Event streaming (DeliveryAdapter + BridgeCallbacks)
    ├── 7. Error classification (ErrorClassifier)
    ├── 8. MCP side effects reading (readMcpSideEffects)
    ├── 9. Session update (SessionMap)
    └── 10. Result assembly (AgentDeliveryResult)
    |
    v
AgentDeliveryResult (to delivery pipeline)

Single Entry Point

ChannelBridge.handle() is the single entry point for all dispatch sites. All 4 core dispatch sites (agent command, auto-reply, cron, follow-up) and the voice-call extension dispatch site will call this method instead of runEmbeddedPiAgent().

Dual Delivery Model

  • During execution: Streaming callbacks (onPartialReply, onBlockReply, onToolResult) deliver text to the channel in real-time as the CLI agent generates output
  • After execution: Final AgentDeliveryResult.payloads deliver complete text blocks (for non-streaming fallback, errors, or media)

Dependencies (all implemented)

Component Ref Module Used For
Runtime factory #19 runtime-factory.ts createCliRuntime(provider)
Error classifier #22 error-classifier.ts classifyError(stderr) for retry/fail decisions
Session map #24 session-map.ts Session lookup/update by composite key
Delivery adapter #26 delivery-adapter.ts AgentEvent stream → ReplyPayload[] chunks
System prompt #28 system-prompt.ts buildSystemPrompt(params)
MCP side effects #30 mcp-side-effects.ts readMcpSideEffects(filePath) after CLI exit
Types PR #4 types.ts AgentDeliveryResult, ChannelMessage, BridgeCallbacks, etc.

handle() Flow

async handle(
    message: ChannelMessage,
    callbacks?: BridgeCallbacks,
    abortSignal?: AbortSignal,
): Promise<AgentDeliveryResult> {
    // 1. Session lookup
    const sessionKey = buildSessionKey(message);
    const existingSessionId = this.sessionMap.get(sessionKey);

    // 2. System prompt
    const systemPrompt = buildSystemPrompt({ ... });

    // 3. MCP config: build McpServerConfig with gateway env vars
    const invocationDir = await mkdtemp(join(tmpdir(), "rc-"));
    const sideEffectsFile = join(invocationDir, "side-effects.ndjson");
    const mcpServers = this.buildMcpConfig(message, sideEffectsFile);

    // 4. Runtime params
    const runtime = createCliRuntime(this.provider);
    const params: AgentExecuteParams = {
        prompt: systemPrompt + "\n\n" + message.text,
        sessionId: existingSessionId,
        mcpServers,
        abortSignal,
        workingDirectory: this.workspaceDir,
        env: this.buildRuntimeEnv(),
    };

    // 5-6. Execute + stream events through DeliveryAdapter
    const adapter = new DeliveryAdapter({ chunkLimit: this.chunkLimit });
    let runResult: AgentRunResult | undefined;
    let lastError: string | undefined;

    try {
        const { payloads, result, error } = await adapter.process(
            runtime.execute(params),
            callbacks,
        );
        runResult = result;
        lastError = error;
    } catch (err) {
        // 7. Error classification
        const category = classifyError(String(err));
        lastError = String(err);
        // Handle based on category (retryable, fatal, context_overflow)
    }

    // 8. Read MCP side effects
    const mcp = await readMcpSideEffects(sideEffectsFile);

    // 9. Session update
    if (runResult?.sessionId) {
        this.sessionMap.set(sessionKey, runResult.sessionId);
    }

    // 10. Cleanup and return
    await rm(invocationDir, { recursive: true, force: true });

    const text = (runResult?.text ?? "").trim();
    return {
        payloads: text ? [{ text }] : [],
        run: runResult ?? DEFAULT_RUN_RESULT,
        mcp,
        error: lastError,
    };
}

Constructor / Configuration

export class ChannelBridge {
    constructor(options: {
        /** CLI runtime provider ("claude", "gemini", "codex", "opencode"). */
        provider: string;
        /** Session map for session persistence. */
        sessionMap: SessionMap;
        /** Gateway URL for MCP server WebSocket connection. */
        gatewayUrl: string;
        /** Gateway auth token for MCP server. */
        gatewayToken: string;
        /** Working directory for CLI subprocess. */
        workspaceDir?: string;
        /** Channel text chunk limit (default: 4000). */
        chunkLimit?: number;
        /** MCP server entry point path. */
        mcpServerPath?: string;
    });
}

MCP Config Assembly

ChannelBridge builds a McpServerConfig for the remoteclaw MCP server and passes it via AgentExecuteParams.mcpServers. Each runtime already handles format translation internally (Claude: inline JSON flag, Gemini: settings.json merge, Codex: TOML config, OpenCode: JSON config).

private buildMcpConfig(
    message: ChannelMessage,
    sideEffectsFile: string,
): Record<string, McpServerConfig> {
    return {
        remoteclaw: {
            command: "node",
            args: [this.mcpServerPath],
            env: {
                REMOTECLAW_GATEWAY_URL: this.gatewayUrl,
                REMOTECLAW_GATEWAY_TOKEN: this.gatewayToken,
                REMOTECLAW_SESSION_KEY: buildSessionKey(message),
                REMOTECLAW_SIDE_EFFECTS_FILE: sideEffectsFile,
                REMOTECLAW_CHANNEL: message.provider,
                REMOTECLAW_ACCOUNT_ID: message.from,
                REMOTECLAW_TO: message.channelId,
                ...(message.replyToId ? { REMOTECLAW_THREAD_ID: message.replyToId } : {}),
            },
        },
    };
}

Session Key Construction

Composite key encoding channel + user + thread context:

function buildSessionKey(message: ChannelMessage): string {
    const threadPart = message.replyToId ?? "_";
    return `${message.channelId}:${message.from}:${threadPart}`;
}

This matches the SessionMap key format from SessionMap (#24).

Error Handling Strategy

Error Category Action
retryable Log, set errorSubtype on result, let caller decide retry
fatal Set error on result, no retry
context_overflow Set errorSubtype: "context_window", caller may start new session
timeout / aborted Already handled by CLIRuntimeBase watchdog/abort

ChannelBridge does NOT implement retry loops — it reports the error category to the caller (dispatch site). The dispatch site owns retry policy.

Implementation

New Files

File Purpose Est. Lines
src/middleware/channel-bridge.ts ChannelBridge class ~200

Test Files

File Coverage
src/middleware/channel-bridge.test.ts Constructor, handle() flow, session lifecycle, MCP config, error paths

Estimated total: ~200 LoC + ~250 LoC tests

Key Design Decisions

  1. No retry loops in ChannelBridge: Error classification is reported, not acted upon. Dispatch sites own retry policy (different for cron vs interactive).

  2. Temp directory per invocation: Each handle() call creates a temp dir for the MCP side effects file. Cleaned up in finally.

  3. Runtime created per call: createCliRuntime() is called each handle(). Runtimes are lightweight (no state between invocations).

  4. DeliveryAdapter owns event-to-payload conversion: ChannelBridge delegates all text chunking and streaming to DeliveryAdapter. It only assembles the final AgentDeliveryResult.

  5. Session key = channel+user+thread: Matches SessionMap's composite key design from SessionMap (Implement SessionMap #24).

Acceptance Criteria

  • src/middleware/channel-bridge.ts orchestrates the full message → execution → delivery flow
  • Streaming callbacks (onPartialReply, onBlockReply, onToolResult) are invoked during execution
  • MCP side effects are read via readMcpSideEffects() after CLI exit (file-based NDJSON per side effects design analysis)
  • Errors from ErrorClassifier set appropriate errorSubtype on the result
  • The handle() method is the single entry point for all dispatch sites
  • Session lookup and update via SessionMap work correctly
  • MCP config is assembled with correct env vars for the remoteclaw MCP server
  • Temp invocation directory is created and cleaned up per call
  • Unit tests verify the orchestration flow with mocked runtime
  • pnpm build passes

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions