Skip to content

Implement DeliveryAdapter #26

@alexey-pelykh

Description

@alexey-pelykh

Summary

Implement a delivery adapter that converts the AgentEvent async iterable from CLI runtimes into channel-deliverable ReplyPayload chunks. This module handles progressive streaming (real-time delivery during execution) and message splitting for channels with character limits.

File: src/middleware/delivery-adapter.ts
Test: src/middleware/delivery-adapter.test.ts
Depends on: types module (PR #4, merged)

Purpose

CLI runtimes emit AgentEvent objects (text, tool_use, tool_result, error, done). The delivery adapter sits between the runtime event stream and channel delivery, responsible for:

  1. Accumulating text events into coherent reply chunks
  2. Splitting long responses at channel message limit boundaries
  3. Streaming partial delivery via BridgeCallbacks during execution
  4. Producing final ReplyPayload[] for post-execution delivery
AgentRuntime.execute()
  → AsyncIterable<AgentEvent>
  → DeliveryAdapter.process()
  → BridgeCallbacks (real-time)  +  ReplyPayload[] (final)

API Surface

/** Options for the delivery adapter. */
export type DeliveryAdapterOptions = {
  /**
   * Maximum characters per message chunk.
   * Different channels have different limits:
   * - Discord: 2000 chars
   * - Telegram: 4096 chars
   * - Slack: ~40000 chars (block-based)
   * Default: 4000 (safe default for most channels)
   */
  chunkLimit?: number | undefined;
};

/**
 * Converts AgentEvent async iterable into channel-deliverable ReplyPayload chunks.
 *
 * Handles:
 * - Text accumulation and chunking at channel limits
 * - Progressive streaming via BridgeCallbacks
 * - Tool result formatting
 * - Error event formatting
 */
export class DeliveryAdapter {
  constructor(options?: DeliveryAdapterOptions);

  /**
   * Process an event stream, invoking callbacks for real-time delivery
   * and returning final payloads for post-execution delivery.
   *
   * @param events - AgentEvent async iterable from runtime.execute()
   * @param callbacks - Optional streaming callbacks for real-time delivery
   * @returns Final ReplyPayload array for post-execution delivery
   */
  process(
    events: AsyncIterable<AgentEvent>,
    callbacks?: BridgeCallbacks,
  ): Promise<ReplyPayload[]>;
}

Implementation Details

Text Accumulation and Chunking

Text events arrive as incremental chunks from the CLI subprocess. The adapter:

  1. Accumulates text into a buffer
  2. When the buffer exceeds chunkLimit, splits at the nearest safe boundary:
    • Prefer splitting at paragraph breaks (\n\n)
    • Fall back to line breaks (\n)
    • Fall back to space boundaries
    • Last resort: hard split at the limit
  3. Each chunk becomes a ReplyPayload with { text: chunk }

Markdown-aware splitting: When splitting, preserve code fence boundaries. Do not split inside a fenced code block (```) — either include the entire block in one chunk or close the fence at the split point and reopen it in the next chunk.

Event Processing

Event Type Behavior
text Accumulate into text buffer. If buffer exceeds chunkLimit, flush chunk and call callbacks.onPartialReply().
tool_use No immediate output (tool started).
tool_result Format as tool result and call callbacks.onToolResult() with { text: formatted }.
error Format error message. Call callbacks.onBlockReply() with { text: errorMsg, isError: true }.
done Flush remaining text buffer. Return all accumulated payloads.

Streaming Callbacks

During process(), the adapter invokes BridgeCallbacks for real-time channel delivery:

  • onPartialReply: Called with each text chunk as it's flushed (either because buffer exceeded chunkLimit, or at natural paragraph boundaries during streaming)
  • onBlockReply: Called with complete blocks (error messages, final flush)
  • onToolResult: Called with formatted tool results

All callbacks are optional. If not provided, text is still accumulated and returned in the final payload array.

Final Payloads

After the event stream completes (done event received), process() returns ReplyPayload[] containing:

  • Any remaining buffered text (flushed as the final chunk)
  • This is primarily used for error payloads, media, or non-streaming fallback delivery

Edge Cases

  • Empty text events: Skip (no accumulation, no callback)
  • No text events at all: Return empty payload array
  • Only error events: Return error payload with isError: true
  • Very long single text event: Split into multiple chunks immediately
  • Text event exactly at chunk limit: Flush without splitting

Test Requirements

Test file: src/middleware/delivery-adapter.test.ts

Text accumulation

  • Single text event → single payload
  • Multiple text events → accumulated into one payload
  • Empty text event → skipped

Message splitting

  • Text exceeding chunkLimit → split into 2+ chunks
  • Split at paragraph boundary (\n\n) when possible
  • Split at line boundary (\n) as fallback
  • Split at word boundary (space) as last resort
  • Hard split when no natural boundary exists
  • Exact-limit text → single chunk (no split)

Streaming callbacks

  • onPartialReply called when buffer flushed mid-stream
  • onBlockReply called for error events
  • onToolResult called for tool result events
  • Callbacks optional (no error when omitted)

Event types

  • tool_use events → no output
  • tool_result events → formatted payload via onToolResult
  • error events → error payload with isError: true
  • done event → flushes remaining buffer

Code fence preservation

  • Text with open code fence is not split mid-fence
  • If split is necessary inside a code block, fence is closed and reopened

Integration pattern

  • Process a realistic event sequence (text, tool_use, tool_result, more text, done)
  • Verify both streaming callbacks and final payloads are correct

Acceptance Criteria

  • src/middleware/delivery-adapter.ts exports DeliveryAdapter class and DeliveryAdapterOptions type
  • Text events accumulated into coherent chunks
  • Long responses split at chunkLimit (default 4000) with smart boundary detection
  • Paragraph > line > word > hard split preference order
  • Markdown code fence boundaries preserved during splitting
  • onPartialReply called during streaming when buffer flushed
  • onBlockReply called for error events with isError: true
  • onToolResult called for tool results
  • done event flushes remaining buffer
  • All callbacks optional (graceful no-op when missing)
  • All tests pass: npx vitest run src/middleware/delivery-adapter.test.ts
  • Full suite passes: npx vitest run

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions