feat(moonshot): explicit context caching for moonshot-v1 via /v1/caching API#25104
feat(moonshot): explicit context caching for moonshot-v1 via /v1/caching API#25104Elarwei001 wants to merge 21 commits intoopenclaw:mainfrom
Conversation
Implements lazy context caching for Moonshot/Kimi models using the /v1/caching API.
Features:
- Cache system prompts and tool definitions to reduce token usage
- Automatic cache invalidation when content hash changes
- Inflight promise coalescing to prevent duplicate cache creation
- TTL auto-renewal via reset_ttl on each request
Configuration:
```yaml
agents:
defaults:
models:
moonshot/kimi-k2-turbo:
params:
contextCache:
enabled: true
ttl: 3600
resetTtl: 3600
```
Closes openclaw#7073
Co-authored-by: Elar Wei <[email protected]>
| onPayload: async (payload) => { | ||
| if (payload && typeof payload === "object") { | ||
| const payloadObj = payload as Record<string, unknown>; | ||
| const payloadMessages = payloadObj.messages; | ||
|
|
||
| if (Array.isArray(payloadMessages)) { | ||
| try { | ||
| const cacheId = await getOrCreateCache({ | ||
| sessionKey, | ||
| apiKey, | ||
| baseUrl, | ||
| model: modelId, | ||
| system: systemContent, | ||
| tools, | ||
| ttl, | ||
| }); | ||
|
|
||
| // Replace messages with cache-injected version | ||
| payloadObj.messages = injectCacheRole( | ||
| payloadMessages as Array<{ role: string; content: unknown }>, | ||
| cacheId, | ||
| resetTtl, | ||
| ); | ||
|
|
||
| log.debug(`[moonshot-cache] Injected cache ${cacheId} for session ${sessionKey}`); | ||
| } catch (err) { | ||
| // On cache error, fall back to normal request | ||
| log.warn(`[moonshot-cache] Cache error, falling back: ${String(err)}`); | ||
| } | ||
| } | ||
| } | ||
| originalOnPayload?.(payload); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Async onPayload will not work — cache injection is a no-op
onPayload is called synchronously by pi-ai and every existing caller in this codebase (options?.onPayload?.(payload) — no await). Making onPayload async means the await getOrCreateCache(...) runs in a detached microtask. By the time the cache ID is retrieved and payloadObj.messages is mutated, the request body has already been serialized and sent to the API.
This effectively makes the entire caching feature a no-op in production — the message array will never contain the cache role when the HTTP request is dispatched.
Every other onPayload wrapper in this file (createOpenAIResponsesStoreWrapper, createOpenRouterWrapper, createZaiToolStreamWrapper, createOpenRouterSystemCacheWrapper) is synchronous for exactly this reason.
To fix this, the cache should be resolved before calling the underlying streamFn, and the cache role should be injected into context.messages synchronously — not inside onPayload. The wrapper's outer function should be async, resolve the cache up front, mutate the context messages, and then call the underlying stream function with the modified context. This would require verifying that pi-ai supports an async StreamFn return type.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/pi-embedded-runner/extra-params.ts
Line: 589-622
Comment:
**Async `onPayload` will not work — cache injection is a no-op**
`onPayload` is called synchronously by pi-ai and every existing caller in this codebase (`options?.onPayload?.(payload)` — no `await`). Making `onPayload` `async` means the `await getOrCreateCache(...)` runs in a detached microtask. By the time the cache ID is retrieved and `payloadObj.messages` is mutated, the request body has already been serialized and sent to the API.
This effectively makes the entire caching feature a no-op in production — the message array will never contain the cache role when the HTTP request is dispatched.
Every other `onPayload` wrapper in this file (`createOpenAIResponsesStoreWrapper`, `createOpenRouterWrapper`, `createZaiToolStreamWrapper`, `createOpenRouterSystemCacheWrapper`) is synchronous for exactly this reason.
To fix this, the cache should be resolved *before* calling the underlying `streamFn`, and the cache role should be injected into `context.messages` synchronously — not inside `onPayload`. The wrapper's outer function should be `async`, resolve the cache up front, mutate the context messages, and then call the underlying stream function with the modified context. This would require verifying that pi-ai supports an async `StreamFn` return type.
How can I resolve this? If you propose a fix, please make it concise.
SamuelHinestrosa
left a comment
There was a problem hiding this comment.
Revisión automática: Cambios revisados. Por favor asegurar que los tests pasan.
- Fix async onPayload race condition by resolving cache BEFORE streaming - Add MAX_CACHE_SIZE (1000) with FIFO eviction to prevent memory leaks - Clear local cache entry immediately after remote deletion to prevent stale entries
- Use async IIFE instead of async generator (StreamFn supports Promise return) - Cast modified messages to context.messages type to avoid pi-ai Message mismatch
The caching API requires base model names (e.g., 'moonshot-v1') without context-length suffixes. Add toCacheModelName() to handle the mapping.
Moonshot/Kimi returns 'cached_tokens' instead of 'cache_read_input_tokens'. This ensures OpenClaw's cacheRead field is properly populated when using Moonshot context caching.
… invasion Reduces extra-params.ts changes from ~135 lines to ~20 lines. All Moonshot-specific logic is now isolated in moonshot-cache.ts.
- Remove unused clearSessionCache function - Simplify comments and remove section dividers - Inline simple functions - Reduce verbosity while maintaining functionality
|
@greptile-apps please re-review - code has been refactored to address previous feedback:
|
AI-Assisted Testing ResultsTested with a real Moonshot API account to verify the caching mechanism works correctly. Test 1: Baseline (without cache)POST /v1/chat/completions
{
"model": "moonshot-v1-32k",
"messages": [
{"role": "system", "content": "<system prompt>"},
{"role": "user", "content": "What is 2+2?"}
]
}Response usage: {
"prompt_tokens": 48,
"completion_tokens": 8,
"total_tokens": 56
}Test 2: With cache# Step 1: Create cache
POST /v1/caching
{
"model": "moonshot-v1",
"messages": [{"role": "system", "content": "<system prompt>"}],
"ttl": 300
}
# Returns: {"id": "cache-xxx", "tokens": 19, "status": "pending"}
# Step 2: Query with cache
POST /v1/chat/completions
{
"model": "moonshot-v1-32k",
"messages": [
{"role": "cache", "content": "cache_id=cache-xxx;reset_ttl=300"},
{"role": "user", "content": "What is 2+2?"}
]
}Response usage: {
"prompt_tokens": 30,
"completion_tokens": 9,
"total_tokens": 39,
"cached_tokens": 19,
"prompt_tokens_details": {
"cached_tokens": 19
}
}Key Findings
Model Name MappingDiscovered that the caching API requires base model names:
This is handled by |
| return (async () => { | ||
| let modifiedContext = context; | ||
|
|
||
| try { | ||
| const cacheId = await getOrCreateCache({ | ||
| sessionKey, | ||
| apiKey, | ||
| baseUrl, | ||
| model: modelId, | ||
| system: systemContent, | ||
| tools, | ||
| ttl, | ||
| }); | ||
|
|
||
| // Inject cache role into messages, replacing system message. | ||
| // Cast to context.messages type to avoid pi-ai's stricter Message type. | ||
| const modifiedMessages = injectCacheRole( | ||
| messages, | ||
| cacheId, | ||
| resetTtl, | ||
| ) as unknown as typeof context.messages; | ||
|
|
||
| modifiedContext = { | ||
| ...context, | ||
| messages: modifiedMessages, | ||
| }; | ||
|
|
||
| log.debug(`[moonshot-cache] Injected cache ${cacheId} for session ${sessionKey}`); | ||
| } catch (err) { | ||
| // On cache error, fall back to normal request with original context | ||
| log.warn(`[moonshot-cache] Cache error, falling back: ${String(err)}`); | ||
| } | ||
|
|
||
| // Delegate to underlying stream with (possibly modified) context | ||
| return baseStreamFn(model, modifiedContext, options); | ||
| })(); |
There was a problem hiding this comment.
Async IIFE returns Promise — incompatible with downstream wrappers
This returns Promise<EventStream> instead of EventStream. Every other StreamFn wrapper in the codebase (createOpenRouterWrapper, createBedrockNoCacheWrapper, createOpenAIResponsesStoreWrapper, etc.) returns the stream synchronously.
After createMoonshotCacheWrapper, createOpenAIResponsesStoreWrapper is applied at extra-params.ts:598. That wrapper calls underlying(model, context, options) and may attempt to use the return value as an EventStream rather than awaiting a Promise. Additionally, the wrappers installed in attempt.ts (lines 760, 786) pass through inner(model, context, options) without await, propagating the Promise up to pi-ai's agent loop.
Whether this actually causes a runtime failure depends on how @mariozechner/pi-ai consumes the streamFn return value internally. If it awaits or handles thenables, this works. If it expects a synchronous EventStream (like createOllamaStreamFn returns via createAssistantMessageEventStream()), this will fail at runtime.
The safer pattern used by createOllamaStreamFn is to create the event stream synchronously, queue the async work via queueMicrotask, and return the stream immediately.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/moonshot-cache.ts
Line: 296-331
Comment:
**Async IIFE returns `Promise` — incompatible with downstream wrappers**
This returns `Promise<EventStream>` instead of `EventStream`. Every other `StreamFn` wrapper in the codebase (`createOpenRouterWrapper`, `createBedrockNoCacheWrapper`, `createOpenAIResponsesStoreWrapper`, etc.) returns the stream synchronously.
After `createMoonshotCacheWrapper`, `createOpenAIResponsesStoreWrapper` is applied at `extra-params.ts:598`. That wrapper calls `underlying(model, context, options)` and may attempt to use the return value as an `EventStream` rather than awaiting a `Promise`. Additionally, the wrappers installed in `attempt.ts` (lines 760, 786) pass through `inner(model, context, options)` without `await`, propagating the `Promise` up to pi-ai's agent loop.
Whether this actually causes a runtime failure depends on how `@mariozechner/pi-ai` consumes the `streamFn` return value internally. If it awaits or handles thenables, this works. If it expects a synchronous `EventStream` (like `createOllamaStreamFn` returns via `createAssistantMessageEventStream()`), this will fail at runtime.
The safer pattern used by `createOllamaStreamFn` is to create the event stream synchronously, queue the async work via `queueMicrotask`, and return the stream immediately.
How can I resolve this? If you propose a fix, please make it concise.sessionKey is not part of SimpleStreamOptions from pi-ai, so it was always undefined when read from options. Now sessionKey is passed as a parameter to createMoonshotCacheWrapper and captured via closure. Fixes Greptile review comment about caching always being skipped.
Tests verify: - Cache role is injected when sessionKey is provided via closure - Caching is skipped when apiKey is missing - Graceful fallback on cache creation error This ensures the OpenClaw integration path works correctly, not just direct API calls.
Tests require MOONSHOT_API_KEY env var and make real API calls: - creates cache and returns cache_id - returns same cache_id for same content (cache hit) - creates new cache when content changes (cache invalidation) - chat completion with cache shows reduced prompt_tokens - wrapper integration modifies context correctly Tests are skipped in CI when API key is not available. Run locally with: MOONSHOT_API_KEY=sk-xxx pnpm vitest run --config vitest.e2e.config.ts test/agents/moonshot-cache.e2e.test.ts
|
@greptile-apps please re-review - all previous issues have been addressed:
Additional improvements:
|
| // Delete old cache if exists and clear local entry immediately | ||
| // This prevents stale entries if createCache fails after deletion | ||
| if (existing) { | ||
| cacheStore.delete(params.sessionKey); |
There was a problem hiding this comment.
Fixed stale cache issue by deleting local entry immediately after invalidation detection, before attempting remote deletion and recreation
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/moonshot-cache.ts
Line: 161
Comment:
Fixed stale cache issue by deleting local entry immediately after invalidation detection, before attempting remote deletion and recreation
How can I resolve this? If you propose a fix, please make it concise.| return (async () => { | ||
| let modifiedContext = context; | ||
|
|
||
| try { | ||
| const cacheId = await getOrCreateCache({ | ||
| sessionKey, | ||
| apiKey, | ||
| baseUrl, | ||
| model: modelId, | ||
| system: systemContent, | ||
| tools, | ||
| ttl, | ||
| }); | ||
|
|
||
| // Inject cache role into messages, replacing system message. | ||
| // Cast to context.messages type to avoid pi-ai's stricter Message type. | ||
| const modifiedMessages = injectCacheRole( | ||
| messages, | ||
| cacheId, | ||
| resetTtl, | ||
| ) as unknown as typeof context.messages; | ||
|
|
||
| modifiedContext = { | ||
| ...context, | ||
| messages: modifiedMessages, | ||
| }; | ||
|
|
||
| log.debug(`[moonshot-cache] Injected cache ${cacheId} for session ${sessionKey}`); | ||
| } catch (err) { | ||
| // On cache error, fall back to normal request with original context | ||
| log.warn(`[moonshot-cache] Cache error, falling back: ${String(err)}`); | ||
| } | ||
|
|
||
| // Delegate to underlying stream with (possibly modified) context | ||
| return baseStreamFn(model, modifiedContext, options); | ||
| })(); |
There was a problem hiding this comment.
Resolved cache asynchronously before calling baseStreamFn to avoid the async onPayload race condition from the previous implementation. The async IIFE returns Promise<EventStream>, which differs from other synchronous wrappers in extra-params.ts (createOpenRouterWrapper, createBedrockNoCacheWrapper). Verify this pattern is supported by @mariozechner/pi-ai in production - the e2e tests pass but this should be validated in real gateway scenarios.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/moonshot-cache.ts
Line: 294-329
Comment:
Resolved cache asynchronously before calling `baseStreamFn` to avoid the async `onPayload` race condition from the previous implementation. The async IIFE returns `Promise<EventStream>`, which differs from other synchronous wrappers in `extra-params.ts` (`createOpenRouterWrapper`, `createBedrockNoCacheWrapper`). Verify this pattern is supported by `@mariozechner/pi-ai` in production - the e2e tests pass but this should be validated in real gateway scenarios.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
The async IIFE returning Promise is supported by the StreamFn type definition in @mariozechner/pi-agent-core:
export type StreamFn = (...) => ReturnType<typeof streamSimple> | Promise<ReturnType<typeof streamSimple>>;
This pattern is explicitly designed for async config lookup scenarios like our cache resolution, so it should be fine.
| agent.streamFn = createMoonshotCacheWrapper( | ||
| agent.streamFn ?? streamSimple, | ||
| moonshotCacheConfig!, | ||
| modelId, | ||
| sessionKey, | ||
| ); |
There was a problem hiding this comment.
Fixed previous issue where sessionKey was unavailable - now passed via closure to createMoonshotCacheWrapper from attempt.ts:744 instead of being extracted from options at call time
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/pi-embedded-runner/extra-params.ts
Line: 588-593
Comment:
Fixed previous issue where `sessionKey` was unavailable - now passed via closure to `createMoonshotCacheWrapper` from `attempt.ts:744` instead of being extracted from `options` at call time
How can I resolve this? If you propose a fix, please make it concise.
Additional Comments (1)
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/agents/usage.ts
Line: 969-971
Comment:
Added Moonshot/Kimi's `cached_tokens` field to normalize cache read counts alongside the standard OpenAI `cache_read_input_tokens`
How can I resolve this? If you propose a fix, please make it concise. |
- Handle moonshot-v1-* → moonshot-v1 - Handle kimi-k2-* → kimi-k2 - Pass through kimi-k2.5, kimi-latest as-is - Add toCacheModelName tests
Tested with real API - kimi-k2.5, kimi-k2, kimi-latest all return 400 'model family is invalid'. Only moonshot-v1 supports caching. - toCacheModelName returns undefined for unsupported models - Wrapper skips caching entirely for unsupported models - Updated tests to reflect actual API behavior
Kimi K2 models use automatic prefix caching (like Anthropic) and return cached_tokens in prompt_tokens_details field. This differs from moonshot-v1 which requires explicit /v1/caching API. - Add prompt_tokens_details.cached_tokens to UsageLike type - Update normalizeUsage to extract nested cached_tokens - Add test for K2 usage format
Better reflects the function's purpose: checking if a model requires explicit /v1/caching API calls vs automatic prefix caching (K2).
- Hide model name implementation detail (MOONSHOT_V1_CACHE_MODEL is internal) - Cleaner API: returns true/false instead of string/undefined
|
Closing this PR for now since If anyone has a specific need for explicit caching on See the analysis in issue #7073 for details on the two different caching mechanisms. |
Summary
Adds explicit context caching support for
moonshot-v1-*models via Moonshot's/v1/cachingAPI.Background
Moonshot has two caching mechanisms:
moonshot-v1-*/v1/cachingAPIkimi-k2.*Changes
New Files
src/agents/moonshot-cache.ts(~280 lines): Cache wrapper with:src/agents/moonshot-cache.test.ts: 17 unit teststest/moonshot-cache.e2e.test.ts: E2E tests (requires API key)Modified Files
src/agents/pi-embedded-runner/extra-params.ts: Integration pointsrc/agents/pi-embedded-runner/run/attempt.ts: Pass sessionKeyConfiguration
Token Savings
Design Doc
https://github.com/Elarwei001/research_openclaw/blob/main/proposals/kimi-context-cache.md