Skip to content

Commit bbf985d

Browse files
authored
feat(plugins): add SQLite plugin state store (#74190)
* feat(plugins): add experimental sqlite plugin state store
1 parent abaa432 commit bbf985d

16 files changed

Lines changed: 1822 additions & 6 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
1212
- CLI/QR/dependencies: internalize small terminal progress and QR wrapper helpers while keeping the real QR encoder dependency direct, reducing the default runtime dependency graph without changing QR output behavior. Thanks @vincentkoc.
1313
- Channels: add Yuanbao channel docs entrance so the Tencent Yuanbao bot appears in the channel listing and sidebar navigation. (#73443) Thanks @loongfay.
1414
- Active Memory: add optional per-conversation `allowedChatIds` and `deniedChatIds` filters so operators can enable recall only for selected direct, group, or channel conversations while keeping broad sessions skipped. (#67977) Thanks @quengh.
15+
- Added SQLite-backed plugin state store (`api.runtime.state.openKeyedStore`) for restart-safe keyed registries with TTL, eviction, and automatic plugin isolation. Thanks @amknight.
1516
- Active Memory: return bounded partial recall summaries when the hidden memory sub-agent times out, including the default temporary-transcript path, so useful recovered context is not discarded. (#73219) Thanks @joeykrug.
1617
- Docker setup: add `OPENCLAW_SKIP_ONBOARDING` so automated Docker installs can skip the interactive onboarding step while still applying gateway defaults. (#55518) Thanks @jinjimz.
1718
- Gateway/memory: add a read-only `doctor.memory.remHarness` RPC so operator clients can preview bounded REM dreaming output without running mutation paths. (#66673) Thanks @samzong.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
597577966dfee329740d7b0a331263afc26db518fe778f0fad95e2a01da88d83 plugin-sdk-api-baseline.json
2-
65fb1cad5e5ec1764e3ccfcfd3fbb2e5cfb938ad34b45e6416bba0c00a1d735a plugin-sdk-api-baseline.jsonl
1+
d5b33ee6be988cd6a844a358aaa098e1f6401b151e5ee1e46dceeccddaeb7434 plugin-sdk-api-baseline.json
2+
dffa8b4afbb085faf42a857805c43708b748111e346552d7ea4654da3bafdee7 plugin-sdk-api-baseline.jsonl

docs/plugins/sdk-runtime.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,28 @@ Provider and channel execution paths must use the active runtime config snapshot
394394

395395
</Accordion>
396396
<Accordion title="api.runtime.state">
397-
State directory resolution.
397+
State directory resolution and SQLite-backed keyed storage.
398398

399399
```typescript
400-
const stateDir = api.runtime.state.resolveStateDir();
400+
const stateDir = api.runtime.state.resolveStateDir(process.env);
401+
const store = api.runtime.state.openKeyedStore<MyRecord>({
402+
namespace: "my-feature",
403+
maxEntries: 200,
404+
defaultTtlMs: 15 * 60_000,
405+
});
406+
407+
await store.register("key-1", { value: "hello" });
408+
const value = await store.lookup("key-1");
409+
await store.consume("key-1");
410+
await store.clear();
401411
```
402412

413+
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
414+
415+
<Warning>
416+
Bundled plugins only in this release.
417+
</Warning>
418+
403419
</Accordion>
404420
<Accordion title="api.runtime.tools">
405421
Memory tool factories and CLI.

src/gateway/server-close.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js
77
import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js";
88
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
99
import { createSubsystemLogger } from "../logging/subsystem.js";
10+
import { closePluginStateSqliteStore } from "../plugin-state/plugin-state-store.js";
1011
import type { PluginServicesHandle } from "../plugins/services.js";
1112
import { normalizeOptionalString } from "../shared/string-coerce.js";
1213

@@ -290,6 +291,7 @@ export function createGatewayCloseHandler(params: {
290291
if (params.pluginServices) {
291292
await shutdownStep("plugin-services", () => params.pluginServices!.stop(), warnings);
292293
}
294+
await shutdownStep("plugin-state-store", () => closePluginStateSqliteStore(), warnings);
293295
await shutdownStep("gmail-watcher", () => stopGmailWatcherOnDemand(), warnings);
294296
params.cron.stop();
295297
params.heartbeatRunner.stop();

src/plugin-sdk/test-helpers/plugin-runtime-mock.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
458458
},
459459
state: {
460460
resolveStateDir: vi.fn(() => "/tmp/openclaw"),
461+
openKeyedStore: vi.fn(() => {
462+
throw new Error("openKeyedStore mock is not configured");
463+
}) as unknown as PluginRuntime["state"]["openKeyedStore"],
461464
},
462465
tasks: {
463466
runs: {
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
import { mkdirSync } from "node:fs";
2+
import { afterEach, describe, expect, it, vi } from "vitest";
3+
import { requireNodeSqlite } from "../infra/node-sqlite.js";
4+
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
5+
import {
6+
closePluginStateSqliteStore,
7+
createPluginStateKeyedStore,
8+
PluginStateStoreError,
9+
probePluginStateStore,
10+
resetPluginStateStoreForTests,
11+
sweepExpiredPluginStateEntries,
12+
} from "./plugin-state-store.js";
13+
import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
14+
import { MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js";
15+
16+
afterEach(() => {
17+
vi.useRealTimers();
18+
resetPluginStateStoreForTests();
19+
});
20+
21+
// ---------------------------------------------------------------------------
22+
// Runtime smoke
23+
// ---------------------------------------------------------------------------
24+
describe("runtime smoke", () => {
25+
it("creates and exercises a keyed store directly", async () => {
26+
await withOpenClawTestState({ label: "e2e-smoke-load" }, async () => {
27+
const store = createPluginStateKeyedStore<{ ready: boolean }>("fixture-plugin", {
28+
namespace: "boot",
29+
maxEntries: 10,
30+
});
31+
expect(store).toBeDefined();
32+
expect(typeof store.register).toBe("function");
33+
expect(typeof store.lookup).toBe("function");
34+
expect(typeof store.consume).toBe("function");
35+
});
36+
});
37+
38+
it("writes and reads a value", async () => {
39+
await withOpenClawTestState({ label: "e2e-smoke-rw" }, async () => {
40+
const store = createPluginStateKeyedStore<{ msg: string }>("fixture-plugin", {
41+
namespace: "data",
42+
maxEntries: 10,
43+
});
44+
await store.register("greeting", { msg: "hello" });
45+
await expect(store.lookup("greeting")).resolves.toEqual({ msg: "hello" });
46+
});
47+
});
48+
49+
it("consumes a value exactly once", async () => {
50+
await withOpenClawTestState({ label: "e2e-smoke-consume" }, async () => {
51+
const store = createPluginStateKeyedStore<{ token: string }>("fixture-plugin", {
52+
namespace: "tokens",
53+
maxEntries: 10,
54+
});
55+
await store.register("one-shot", { token: "abc123" });
56+
57+
const first = await store.consume("one-shot");
58+
expect(first).toEqual({ token: "abc123" });
59+
60+
const second = await store.consume("one-shot");
61+
expect(second).toBeUndefined();
62+
63+
await expect(store.lookup("one-shot")).resolves.toBeUndefined();
64+
});
65+
});
66+
});
67+
68+
// ---------------------------------------------------------------------------
69+
// Persistence
70+
// ---------------------------------------------------------------------------
71+
describe("persistence", () => {
72+
it("survives close and reopen of the store", async () => {
73+
await withOpenClawTestState({ label: "e2e-persist" }, async () => {
74+
const storeA = createPluginStateKeyedStore<{ persisted: boolean }>("fixture-plugin", {
75+
namespace: "durable",
76+
maxEntries: 10,
77+
});
78+
await storeA.register("key1", { persisted: true });
79+
await storeA.register("key2", { persisted: true });
80+
81+
// Tear down the cached DB handle and option signatures – simulates
82+
// a full gateway restart while the on-disk DB survives.
83+
resetPluginStateStoreForTests();
84+
85+
const storeB = createPluginStateKeyedStore<{ persisted: boolean }>("fixture-plugin", {
86+
namespace: "durable",
87+
maxEntries: 10,
88+
});
89+
await expect(storeB.lookup("key1")).resolves.toEqual({ persisted: true });
90+
await expect(storeB.lookup("key2")).resolves.toEqual({ persisted: true });
91+
});
92+
});
93+
});
94+
95+
// ---------------------------------------------------------------------------
96+
// TTL
97+
// ---------------------------------------------------------------------------
98+
describe("TTL", () => {
99+
it("hides expired values and sweep removes the row", async () => {
100+
await withOpenClawTestState({ label: "e2e-ttl" }, async () => {
101+
vi.useFakeTimers();
102+
vi.setSystemTime(10_000);
103+
104+
const store = createPluginStateKeyedStore<{ v: number }>("fixture-plugin", {
105+
namespace: "ttl-test",
106+
maxEntries: 10,
107+
});
108+
await store.register("short", { v: 1 }, { ttlMs: 500 });
109+
await store.register("long", { v: 2 }, { ttlMs: 60_000 });
110+
111+
// Before expiry – both visible.
112+
await expect(store.lookup("short")).resolves.toEqual({ v: 1 });
113+
await expect(store.lookup("long")).resolves.toEqual({ v: 2 });
114+
115+
// Advance past the short TTL.
116+
vi.setSystemTime(10_600);
117+
118+
// Expired value is invisible to reads.
119+
await expect(store.lookup("short")).resolves.toBeUndefined();
120+
await expect(store.lookup("long")).resolves.toEqual({ v: 2 });
121+
122+
// Sweep physically removes the expired row.
123+
const swept = sweepExpiredPluginStateEntries();
124+
expect(swept).toBe(1);
125+
126+
// After sweep the entry list contains only the long-lived record.
127+
const remaining = await store.entries();
128+
expect(remaining).toHaveLength(1);
129+
expect(remaining[0].key).toBe("long");
130+
});
131+
});
132+
});
133+
134+
// ---------------------------------------------------------------------------
135+
// Isolation
136+
// ---------------------------------------------------------------------------
137+
describe("isolation", () => {
138+
it("segregates plugins sharing namespace and key", async () => {
139+
await withOpenClawTestState({ label: "e2e-isolation" }, async () => {
140+
const pluginA = createPluginStateKeyedStore<{ owner: string }>("plugin-a", {
141+
namespace: "x",
142+
maxEntries: 10,
143+
});
144+
const pluginB = createPluginStateKeyedStore<{ owner: string }>("plugin-b", {
145+
namespace: "x",
146+
maxEntries: 10,
147+
});
148+
149+
await pluginA.register("same", { owner: "a" });
150+
await pluginB.register("same", { owner: "b" });
151+
152+
await expect(pluginA.lookup("same")).resolves.toEqual({ owner: "a" });
153+
await expect(pluginB.lookup("same")).resolves.toEqual({ owner: "b" });
154+
155+
// Clearing one plugin's namespace does not affect the other.
156+
await pluginA.clear();
157+
await expect(pluginA.lookup("same")).resolves.toBeUndefined();
158+
await expect(pluginB.lookup("same")).resolves.toEqual({ owner: "b" });
159+
});
160+
});
161+
});
162+
163+
// ---------------------------------------------------------------------------
164+
// Limits
165+
// ---------------------------------------------------------------------------
166+
describe("limits", () => {
167+
it("accepts a value at the 64 KB boundary", async () => {
168+
await withOpenClawTestState({ label: "e2e-limit-accept" }, async () => {
169+
const store = createPluginStateKeyedStore<string>("fixture-plugin", {
170+
namespace: "size",
171+
maxEntries: 10,
172+
});
173+
// JSON.stringify wraps a string in quotes (+2 bytes).
174+
// 65 534 chars → 65 536 bytes of JSON → exactly at limit.
175+
const boundary = "x".repeat(65_534);
176+
await expect(store.register("big", boundary)).resolves.toBeUndefined();
177+
await expect(store.lookup("big")).resolves.toBe(boundary);
178+
});
179+
});
180+
181+
it("rejects a value one byte over 64 KB", async () => {
182+
await withOpenClawTestState({ label: "e2e-limit-reject" }, async () => {
183+
const store = createPluginStateKeyedStore<string>("fixture-plugin", {
184+
namespace: "size",
185+
maxEntries: 10,
186+
});
187+
// 65 535 chars → 65 537 bytes of JSON → over limit.
188+
const oversize = "x".repeat(65_535);
189+
await expect(store.register("big", oversize)).rejects.toMatchObject({
190+
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
191+
});
192+
});
193+
});
194+
195+
it("enforces the per-plugin live-row cap", async () => {
196+
await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => {
197+
// Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so
198+
// namespace eviction never fires (each namespace has generous room).
199+
const nsCount = 10;
200+
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100
201+
const stores = Array.from({ length: nsCount }, (_, i) =>
202+
createPluginStateKeyedStore("fixture-plugin", {
203+
namespace: `ns-${i}`,
204+
maxEntries: perNs + 1,
205+
}),
206+
);
207+
208+
for (let ns = 0; ns < nsCount; ns += 1) {
209+
for (let k = 0; k < perNs; k += 1) {
210+
await stores[ns].register(`k-${k}`, { ns, k });
211+
}
212+
}
213+
214+
// One more row tips over the plugin-wide limit.
215+
await expect(stores[0].register("overflow", { boom: true })).rejects.toMatchObject({
216+
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
217+
});
218+
});
219+
});
220+
221+
it("evicts oldest entries when namespace maxEntries is exceeded", async () => {
222+
await withOpenClawTestState({ label: "e2e-limit-eviction" }, async () => {
223+
vi.useFakeTimers();
224+
const store = createPluginStateKeyedStore<number>("fixture-plugin", {
225+
namespace: "capped",
226+
maxEntries: 3,
227+
});
228+
229+
vi.setSystemTime(1000);
230+
await store.register("a", 1);
231+
vi.setSystemTime(2000);
232+
await store.register("b", 2);
233+
vi.setSystemTime(3000);
234+
await store.register("c", 3);
235+
vi.setSystemTime(4000);
236+
await store.register("d", 4); // should evict "a"
237+
238+
const entries = await store.entries();
239+
expect(entries).toHaveLength(3);
240+
expect(entries.map((e) => e.key)).toEqual(["b", "c", "d"]);
241+
await expect(store.lookup("a")).resolves.toBeUndefined();
242+
});
243+
});
244+
});
245+
246+
// ---------------------------------------------------------------------------
247+
// Failure safety
248+
// ---------------------------------------------------------------------------
249+
describe("failure safety", () => {
250+
it("gives a typed error for unsupported schema versions", async () => {
251+
await withOpenClawTestState({ label: "e2e-fail-schema" }, async () => {
252+
// Pre-seed the DB with a future schema version.
253+
mkdirSync(resolvePluginStateDir(), { recursive: true });
254+
const { DatabaseSync } = requireNodeSqlite();
255+
const db = new DatabaseSync(resolvePluginStateSqlitePath());
256+
db.exec("PRAGMA user_version = 99;");
257+
db.close();
258+
259+
const store = createPluginStateKeyedStore("fixture-plugin", {
260+
namespace: "schema",
261+
maxEntries: 10,
262+
});
263+
const error = await store.register("k", { ok: true }).catch((e: unknown) => e);
264+
expect(error).toBeInstanceOf(PluginStateStoreError);
265+
expect(error).toMatchObject({ code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED" });
266+
});
267+
});
268+
269+
it("probe returns redacted diagnostics without leaking stored values", async () => {
270+
await withOpenClawTestState({ label: "e2e-fail-probe" }, async () => {
271+
const result = probePluginStateStore();
272+
expect(result.ok).toBe(true);
273+
expect(result.dbPath).toContain("state.sqlite");
274+
expect(result.steps.length).toBeGreaterThanOrEqual(4);
275+
expect(result.steps.every((s) => s.ok)).toBe(true);
276+
277+
// The probe's temporary stored value must not leak into the result.
278+
const serialised = JSON.stringify(result);
279+
expect(serialised).not.toContain("probe-value");
280+
});
281+
});
282+
283+
it("close and reopen cycle is clean", async () => {
284+
await withOpenClawTestState({ label: "e2e-fail-reopen" }, async () => {
285+
const store = createPluginStateKeyedStore<{ v: number }>("fixture-plugin", {
286+
namespace: "reopen",
287+
maxEntries: 10,
288+
});
289+
await store.register("k", { v: 1 });
290+
291+
// First close.
292+
closePluginStateSqliteStore();
293+
await expect(store.lookup("k")).resolves.toEqual({ v: 1 });
294+
295+
// Second close (idempotent).
296+
closePluginStateSqliteStore();
297+
await expect(store.lookup("k")).resolves.toEqual({ v: 1 });
298+
299+
// Write after reopen.
300+
await store.register("k", { v: 2 });
301+
await expect(store.lookup("k")).resolves.toEqual({ v: 2 });
302+
});
303+
});
304+
});
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import path from "node:path";
2+
import { resolveStateDir } from "../config/paths.js";
3+
4+
export function resolvePluginStateDir(env: NodeJS.ProcessEnv = process.env): string {
5+
return path.join(resolveStateDir(env), "plugin-state");
6+
}
7+
8+
export function resolvePluginStateSqlitePath(env: NodeJS.ProcessEnv = process.env): string {
9+
return path.join(resolvePluginStateDir(env), "state.sqlite");
10+
}

0 commit comments

Comments
 (0)