Skip to content

Commit fcb396b

Browse files
authored
feat(plugin-state): add registerIfAbsent keyed store (#77135)
1 parent 071db2c commit fcb396b

8 files changed

Lines changed: 283 additions & 34 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai
5757
- Exec approvals: add a tree-sitter-backed shell command explainer for future approval and command-review surfaces. (#75004) Thanks @jesse-merhi.
5858
- Agents/sandbox: store sandbox container and browser registry entries as per-runtime shard files, reducing unrelated session lock contention while `openclaw doctor --fix` migrates legacy monolithic registry files. (#74831) Thanks @luckylhb90.
5959
- Plugins/ClawHub: annotate 429 errors from ClawHub with the reset window from `RateLimit-Reset`/`Retry-After` and append a `Sign in for higher rate limits.` hint when the request was unauthenticated, so users can see when downloads will recover and how to lift the cap. Thanks @romneyda.
60+
- Plugins/runtime state: add `registerIfAbsent` for atomic keyed-store dedupe claims that return whether a plugin successfully claimed a key without overwriting an existing live value. Thanks @amknight.
6061

6162
### Fixes
6263

docs/plugins/sdk-runtime.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,13 @@ Provider and channel execution paths must use the active runtime config snapshot
417417
});
418418

419419
await store.register("key-1", { value: "hello" });
420+
const claimed = await store.registerIfAbsent("dedupe-key", { value: "first" });
420421
const value = await store.lookup("key-1");
421422
await store.consume("key-1");
422423
await store.clear();
423424
```
424425

425-
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
426+
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
426427

427428
<Warning>
428429
Bundled plugins only in this release.

src/plugin-state/plugin-state-store.e2e.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ describe("runtime smoke", () => {
3131
});
3232
expect(store).toBeDefined();
3333
expect(typeof store.register).toBe("function");
34+
expect(typeof store.registerIfAbsent).toBe("function");
3435
expect(typeof store.lookup).toBe("function");
3536
expect(typeof store.consume).toBe("function");
3637
});

src/plugin-state/plugin-state-store.runtime.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ describe("plugin runtime state proxy", () => {
6565
namespace: "runtime",
6666
maxEntries: 10,
6767
});
68-
await store.register("k", { plugin: "discord" });
68+
await expect(store.registerIfAbsent("k", { plugin: "discord" })).resolves.toBe(true);
69+
await expect(store.registerIfAbsent("k", { plugin: "duplicate" })).resolves.toBe(false);
6970

7071
const telegram = createPluginRecord("telegram", "bundled");
7172
registry.registry.plugins.push(telegram);

src/plugin-state/plugin-state-store.sqlite.ts

Lines changed: 104 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type UserVersionRow = {
4040

4141
type PluginStateStatements = {
4242
upsertEntry: StatementSync;
43+
insertEntryIfAbsent: StatementSync;
4344
selectEntry: StatementSync;
4445
selectEntries: StatementSync;
4546
deleteEntry: StatementSync;
@@ -190,6 +191,23 @@ function createStatements(db: DatabaseSync): PluginStateStatements {
190191
created_at = excluded.created_at,
191192
expires_at = excluded.expires_at
192193
`),
194+
insertEntryIfAbsent: db.prepare(`
195+
INSERT OR IGNORE INTO plugin_state_entries (
196+
plugin_id,
197+
namespace,
198+
entry_key,
199+
value_json,
200+
created_at,
201+
expires_at
202+
) VALUES (
203+
@plugin_id,
204+
@namespace,
205+
@entry_key,
206+
@value_json,
207+
@created_at,
208+
@expires_at
209+
)
210+
`),
193211
selectEntry: db.prepare(`
194212
SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at
195213
FROM plugin_state_entries
@@ -363,6 +381,44 @@ function runWriteTransaction<T>(
363381
}
364382
}
365383

384+
function enforcePostRegisterLimits(params: {
385+
store: PluginStateDatabase;
386+
pluginId: string;
387+
namespace: string;
388+
maxEntries: number;
389+
now: number;
390+
}): void {
391+
const namespaceCount = countRow(
392+
params.store.statements.countLiveNamespace.get(
393+
params.pluginId,
394+
params.namespace,
395+
params.now,
396+
) as CountRow | undefined,
397+
);
398+
if (namespaceCount > params.maxEntries) {
399+
params.store.statements.deleteOldestNamespace.run(
400+
params.pluginId,
401+
params.namespace,
402+
params.now,
403+
namespaceCount - params.maxEntries,
404+
);
405+
}
406+
407+
const pluginCount = countRow(
408+
params.store.statements.countLivePlugin.get(params.pluginId, params.now) as
409+
| CountRow
410+
| undefined,
411+
);
412+
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
413+
throw createPluginStateError({
414+
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
415+
operation: "register",
416+
message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`,
417+
path: params.store.path,
418+
});
419+
}
420+
}
421+
366422
export function pluginStateRegister(params: {
367423
pluginId: string;
368424
namespace: string;
@@ -384,32 +440,56 @@ export function pluginStateRegister(params: {
384440
created_at: now,
385441
expires_at: expiresAt,
386442
});
443+
enforcePostRegisterLimits({
444+
store,
445+
pluginId: params.pluginId,
446+
namespace: params.namespace,
447+
maxEntries: params.maxEntries,
448+
now,
449+
});
450+
});
451+
} catch (error) {
452+
throw wrapPluginStateError(
453+
error,
454+
"register",
455+
"PLUGIN_STATE_WRITE_FAILED",
456+
"Failed to register plugin state entry.",
457+
);
458+
}
459+
}
387460

388-
const namespaceCount = countRow(
389-
store.statements.countLiveNamespace.get(params.pluginId, params.namespace, now) as
390-
| CountRow
391-
| undefined,
392-
);
393-
if (namespaceCount > params.maxEntries) {
394-
store.statements.deleteOldestNamespace.run(
395-
params.pluginId,
396-
params.namespace,
397-
now,
398-
namespaceCount - params.maxEntries,
399-
);
400-
}
401-
402-
const pluginCount = countRow(
403-
store.statements.countLivePlugin.get(params.pluginId, now) as CountRow | undefined,
404-
);
405-
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
406-
throw createPluginStateError({
407-
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
408-
operation: "register",
409-
message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`,
410-
path: store.path,
411-
});
461+
export function pluginStateRegisterIfAbsent(params: {
462+
pluginId: string;
463+
namespace: string;
464+
key: string;
465+
valueJson: string;
466+
maxEntries: number;
467+
ttlMs?: number;
468+
}): boolean {
469+
try {
470+
return runWriteTransaction("register", (store) => {
471+
const now = Date.now();
472+
const expiresAt = params.ttlMs == null ? null : now + params.ttlMs;
473+
store.statements.pruneExpiredNamespace.run(params.pluginId, params.namespace, now);
474+
const result = store.statements.insertEntryIfAbsent.run({
475+
plugin_id: params.pluginId,
476+
namespace: params.namespace,
477+
entry_key: params.key,
478+
value_json: params.valueJson,
479+
created_at: now,
480+
expires_at: expiresAt,
481+
});
482+
if (result.changes === 0) {
483+
return false;
412484
}
485+
enforcePostRegisterLimits({
486+
store,
487+
pluginId: params.pluginId,
488+
namespace: params.namespace,
489+
maxEntries: params.maxEntries,
490+
now,
491+
});
492+
return true;
413493
});
414494
} catch (error) {
415495
throw wrapPluginStateError(

src/plugin-state/plugin-state-store.test.ts

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,145 @@ describe("plugin state keyed store", () => {
5858
});
5959
});
6060

61+
it("registerIfAbsent inserts the first value and preserves live duplicates", async () => {
62+
await withOpenClawTestState({ label: "plugin-state-register-if-absent-live" }, async () => {
63+
vi.useFakeTimers();
64+
const store = createPluginStateKeyedStore<{ version: number }>("discord", {
65+
namespace: "claims",
66+
maxEntries: 10,
67+
});
68+
69+
vi.setSystemTime(1000);
70+
await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 1000 })).resolves.toBe(
71+
true,
72+
);
73+
vi.setSystemTime(1200);
74+
await expect(store.registerIfAbsent("claim", { version: 2 }, { ttlMs: 5000 })).resolves.toBe(
75+
false,
76+
);
77+
78+
await expect(store.lookup("claim")).resolves.toEqual({ version: 1 });
79+
await expect(store.entries()).resolves.toMatchObject([
80+
{ key: "claim", value: { version: 1 }, createdAt: 1000, expiresAt: 2000 },
81+
]);
82+
});
83+
});
84+
85+
it("registerIfAbsent replaces expired keys", async () => {
86+
await withOpenClawTestState({ label: "plugin-state-register-if-absent-expired" }, async () => {
87+
vi.useFakeTimers();
88+
const store = createPluginStateKeyedStore<{ version: number }>("discord", {
89+
namespace: "claims-expired",
90+
maxEntries: 10,
91+
});
92+
93+
vi.setSystemTime(1000);
94+
await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 100 })).resolves.toBe(
95+
true,
96+
);
97+
vi.setSystemTime(1200);
98+
await expect(store.registerIfAbsent("claim", { version: 2 })).resolves.toBe(true);
99+
100+
await expect(store.lookup("claim")).resolves.toEqual({ version: 2 });
101+
await expect(store.entries()).resolves.toMatchObject([
102+
{ key: "claim", value: { version: 2 }, createdAt: 1200 },
103+
]);
104+
});
105+
});
106+
107+
it("registerIfAbsent keeps plugin and namespace claims isolated", async () => {
108+
await withOpenClawTestState(
109+
{ label: "plugin-state-register-if-absent-isolation" },
110+
async () => {
111+
const discordA = createPluginStateKeyedStore<{ owner: string }>("discord", {
112+
namespace: "claims-a",
113+
maxEntries: 10,
114+
});
115+
const discordB = createPluginStateKeyedStore<{ owner: string }>("discord", {
116+
namespace: "claims-b",
117+
maxEntries: 10,
118+
});
119+
const telegramA = createPluginStateKeyedStore<{ owner: string }>("telegram", {
120+
namespace: "claims-a",
121+
maxEntries: 10,
122+
});
123+
124+
await expect(discordA.registerIfAbsent("same", { owner: "discord-a" })).resolves.toBe(true);
125+
await expect(discordB.registerIfAbsent("same", { owner: "discord-b" })).resolves.toBe(true);
126+
await expect(telegramA.registerIfAbsent("same", { owner: "telegram-a" })).resolves.toBe(
127+
true,
128+
);
129+
await expect(discordA.registerIfAbsent("same", { owner: "overwrite" })).resolves.toBe(
130+
false,
131+
);
132+
133+
await expect(discordA.lookup("same")).resolves.toEqual({ owner: "discord-a" });
134+
await expect(discordB.lookup("same")).resolves.toEqual({ owner: "discord-b" });
135+
await expect(telegramA.lookup("same")).resolves.toEqual({ owner: "telegram-a" });
136+
},
137+
);
138+
});
139+
140+
it("registerIfAbsent only lets one parallel claimant win", async () => {
141+
await withOpenClawTestState({ label: "plugin-state-register-if-absent-race" }, async () => {
142+
const store = createPluginStateKeyedStore<{ claimant: number }>("discord", {
143+
namespace: "claims-race",
144+
maxEntries: 10,
145+
});
146+
147+
const attempts = await Promise.all(
148+
Array.from({ length: 25 }, async (_, claimant) =>
149+
store.registerIfAbsent("claim", { claimant }),
150+
),
151+
);
152+
153+
expect(attempts.filter(Boolean)).toHaveLength(1);
154+
const stored = await store.lookup("claim");
155+
expect(stored).toBeDefined();
156+
expect(attempts[stored?.claimant ?? -1]).toBe(true);
157+
});
158+
});
159+
160+
it("registerIfAbsent preserves eviction and plugin row cap behavior", async () => {
161+
await withOpenClawTestState({ label: "plugin-state-register-if-absent-limits" }, async () => {
162+
vi.useFakeTimers();
163+
const evicting = createPluginStateKeyedStore<number>("discord", {
164+
namespace: "claims-evict",
165+
maxEntries: 2,
166+
});
167+
vi.setSystemTime(1000);
168+
await evicting.registerIfAbsent("a", 1);
169+
vi.setSystemTime(2000);
170+
await evicting.registerIfAbsent("b", 2);
171+
vi.setSystemTime(3000);
172+
await evicting.registerIfAbsent("c", 3);
173+
await expect(evicting.entries()).resolves.toMatchObject([{ key: "b" }, { key: "c" }]);
174+
175+
seedPluginStateEntriesForTests([
176+
...Array.from({ length: 999 }, (_, entryIndex) => ({
177+
pluginId: "limited-plugin",
178+
namespace: "limit",
179+
key: `k-${entryIndex}`,
180+
value: { entryIndex },
181+
})),
182+
{
183+
pluginId: "limited-plugin",
184+
namespace: "sibling",
185+
key: "k-0",
186+
value: { sibling: true },
187+
},
188+
]);
189+
const limited = createPluginStateKeyedStore("limited-plugin", {
190+
namespace: "limit",
191+
maxEntries: 1_001,
192+
});
193+
await expect(limited.registerIfAbsent("overflow", { overflow: true })).rejects.toMatchObject({
194+
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
195+
});
196+
await expect(limited.lookup("overflow")).resolves.toBeUndefined();
197+
});
198+
});
199+
61200
it("returns undefined for missing lookups and consumes by deleting atomically", async () => {
62201
await withOpenClawTestState({ label: "plugin-state-consume" }, async () => {
63202
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {

0 commit comments

Comments
 (0)