Skip to content

Commit cceefe8

Browse files
committed
fix: harden delivery recovery backoff eligibility and tests (#27710) (thanks @Jimmy-xuzimo)
1 parent 0cfd448 commit cceefe8

File tree

3 files changed

+94
-28
lines changed

3 files changed

+94
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
1717

1818
### Fixes
1919

20+
- Delivery queue/recovery backoff: prevent retry starvation by persisting `lastAttemptAt` on failed sends and deferring recovery retries until each entry's `lastAttemptAt + backoff` window is eligible, while continuing to recover ready entries behind deferred ones. Landed from contributor PR #27710 by @Jimmy-xuzimo. Thanks @Jimmy-xuzimo.
2021
- Microsoft Teams/File uploads: acknowledge `fileConsent/invoke` immediately (`invokeResponse` before upload + file card send) so Teams no longer shows false "Something went wrong" timeout banners while upload completion continues asynchronously; includes updated async regression coverage. Landed from contributor PR #27641 by @scz2011.
2122
- Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427)
2223
- Typing/Main reply pipeline: always mark dispatch idle in `agent-runner` finalization so typing cleanup runs even when dispatcher `onIdle` does not fire, preventing stuck typing indicators after run completion. (#27250) Thanks @Sid-Qin.

src/infra/outbound/delivery-queue.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
4747
id: string;
4848
enqueuedAt: number;
4949
retryCount: number;
50+
lastAttemptAt?: number;
5051
lastError?: string;
5152
}
5253

@@ -122,6 +123,7 @@ export async function failDelivery(id: string, error: string, stateDir?: string)
122123
const raw = await fs.promises.readFile(filePath, "utf-8");
123124
const entry: QueuedDelivery = JSON.parse(raw);
124125
entry.retryCount += 1;
126+
entry.lastAttemptAt = Date.now();
125127
entry.lastError = error;
126128
const tmp = `${filePath}.${process.pid}.tmp`;
127129
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
@@ -208,8 +210,6 @@ export async function recoverPendingDeliveries(opts: {
208210
log: RecoveryLogger;
209211
cfg: OpenClawConfig;
210212
stateDir?: string;
211-
/** Override for testing — resolves instead of using real setTimeout. */
212-
delay?: (ms: number) => Promise<void>;
213213
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */
214214
maxRecoveryMs?: number;
215215
}): Promise<{ recovered: number; failed: number; skipped: number }> {
@@ -223,12 +223,12 @@ export async function recoverPendingDeliveries(opts: {
223223

224224
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
225225

226-
const delayFn = opts.delay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms)));
227226
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
228227

229228
let recovered = 0;
230229
let failed = 0;
231230
let skipped = 0;
231+
let deferred = 0;
232232

233233
for (const entry of pending) {
234234
const now = Date.now();
@@ -252,15 +252,18 @@ export async function recoverPendingDeliveries(opts: {
252252

253253
const backoff = computeBackoffMs(entry.retryCount + 1);
254254
if (backoff > 0) {
255-
if (now + backoff >= deadline) {
256-
opts.log.info(
257-
`Backoff ${backoff}ms exceeds budget for ${entry.id} — skipping to next entry`,
258-
);
259-
skipped += 1;
260-
continue;
255+
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
256+
if (!firstReplayAfterCrash) {
257+
const baseAttemptAt = entry.lastAttemptAt ?? entry.enqueuedAt;
258+
const nextEligibleAt = baseAttemptAt + backoff;
259+
if (now < nextEligibleAt) {
260+
deferred += 1;
261+
opts.log.info(
262+
`Delivery ${entry.id} not ready for retry yet — backoff ${nextEligibleAt - now}ms remaining`,
263+
);
264+
continue;
265+
}
261266
}
262-
opts.log.info(`Waiting ${backoff}ms before retrying delivery ${entry.id}`);
263-
await delayFn(backoff);
264267
}
265268

266269
try {
@@ -304,7 +307,7 @@ export async function recoverPendingDeliveries(opts: {
304307
}
305308

306309
opts.log.info(
307-
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries)`,
310+
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries), ${deferred} deferred (backoff)`,
308311
);
309312
return { recovered, failed, skipped };
310313
}

src/infra/outbound/outbound.test.ts

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ describe("delivery-queue", () => {
104104
});
105105

106106
describe("failDelivery", () => {
107-
it("increments retryCount and sets lastError", async () => {
107+
it("increments retryCount, records attempt time, and sets lastError", async () => {
108108
const id = await enqueueDelivery(
109109
{
110110
channel: "telegram",
@@ -119,6 +119,8 @@ describe("delivery-queue", () => {
119119
const queueDir = path.join(tmpDir, "delivery-queue");
120120
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
121121
expect(entry.retryCount).toBe(1);
122+
expect(typeof entry.lastAttemptAt).toBe("number");
123+
expect(entry.lastAttemptAt).toBeGreaterThan(0);
122124
expect(entry.lastError).toBe("connection refused");
123125
});
124126
});
@@ -204,36 +206,43 @@ describe("delivery-queue", () => {
204206
});
205207

206208
describe("recoverPendingDeliveries", () => {
207-
const noopDelay = async () => {};
208209
const baseCfg = {};
209210
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
210211
const enqueueCrashRecoveryEntries = async () => {
211212
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
212213
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
213214
};
214-
const setEntryRetryCount = (id: string, retryCount: number) => {
215+
const setEntryState = (
216+
id: string,
217+
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
218+
) => {
215219
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
216220
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
217-
entry.retryCount = retryCount;
221+
entry.retryCount = state.retryCount;
222+
if (state.lastAttemptAt === undefined) {
223+
delete entry.lastAttemptAt;
224+
} else {
225+
entry.lastAttemptAt = state.lastAttemptAt;
226+
}
227+
if (state.enqueuedAt !== undefined) {
228+
entry.enqueuedAt = state.enqueuedAt;
229+
}
218230
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
219231
};
220232
const runRecovery = async ({
221233
deliver,
222234
log = createLog(),
223-
delay = noopDelay,
224235
maxRecoveryMs,
225236
}: {
226237
deliver: ReturnType<typeof vi.fn>;
227238
log?: ReturnType<typeof createLog>;
228-
delay?: (ms: number) => Promise<void>;
229239
maxRecoveryMs?: number;
230240
}) => {
231241
const result = await recoverPendingDeliveries({
232242
deliver: deliver as DeliverFn,
233243
log,
234244
cfg: baseCfg,
235245
stateDir: tmpDir,
236-
delay,
237246
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
238247
});
239248
return { result, log };
@@ -261,7 +270,7 @@ describe("delivery-queue", () => {
261270
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
262271
tmpDir,
263272
);
264-
setEntryRetryCount(id, MAX_RETRIES);
273+
setEntryState(id, { retryCount: MAX_RETRIES });
265274

266275
const deliver = vi.fn();
267276
const { result } = await runRecovery({ deliver });
@@ -377,29 +386,82 @@ describe("delivery-queue", () => {
377386
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
378387
});
379388

380-
it("defers entries when backoff exceeds the recovery budget", async () => {
389+
it("defers entries until backoff becomes eligible", async () => {
381390
const id = await enqueueDelivery(
382391
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
383392
tmpDir,
384393
);
385-
setEntryRetryCount(id, 3);
394+
setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() });
386395

387396
const deliver = vi.fn().mockResolvedValue([]);
388-
const delay = vi.fn(async () => {});
389397
const { result, log } = await runRecovery({
390398
deliver,
391-
delay,
392-
maxRecoveryMs: 1000,
399+
maxRecoveryMs: 60_000,
393400
});
394401

395402
expect(deliver).not.toHaveBeenCalled();
396-
expect(delay).not.toHaveBeenCalled();
397-
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 1 });
403+
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
404+
405+
const remaining = await loadPendingDeliveries(tmpDir);
406+
expect(remaining).toHaveLength(1);
407+
408+
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
409+
});
410+
411+
it("continues past high-backoff entries and recovers ready entries behind them", async () => {
412+
const now = Date.now();
413+
const blockedId = await enqueueDelivery(
414+
{ channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
415+
tmpDir,
416+
);
417+
const readyId = await enqueueDelivery(
418+
{ channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
419+
tmpDir,
420+
);
421+
422+
setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 });
423+
setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
424+
425+
const deliver = vi.fn().mockResolvedValue([]);
426+
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
427+
428+
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
429+
expect(deliver).toHaveBeenCalledTimes(1);
430+
expect(deliver).toHaveBeenCalledWith(
431+
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
432+
);
398433

399434
const remaining = await loadPendingDeliveries(tmpDir);
400435
expect(remaining).toHaveLength(1);
436+
expect(remaining[0]?.id).toBe(blockedId);
437+
});
438+
439+
it("recovers deferred entries on a later restart once backoff elapsed", async () => {
440+
vi.useFakeTimers();
441+
const start = new Date("2026-01-01T00:00:00.000Z");
442+
vi.setSystemTime(start);
443+
444+
const id = await enqueueDelivery(
445+
{ channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
446+
tmpDir,
447+
);
448+
setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() });
449+
450+
const firstDeliver = vi.fn().mockResolvedValue([]);
451+
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
452+
expect(firstRun.result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
453+
expect(firstDeliver).not.toHaveBeenCalled();
454+
455+
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
456+
const secondDeliver = vi.fn().mockResolvedValue([]);
457+
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
458+
expect(secondRun.result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
459+
expect(secondDeliver).toHaveBeenCalledTimes(1);
460+
461+
const remaining = await loadPendingDeliveries(tmpDir);
462+
expect(remaining).toHaveLength(0);
401463

402-
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("Backoff"));
464+
vi.useRealTimers();
403465
});
404466

405467
it("returns zeros when queue is empty", async () => {

0 commit comments

Comments
 (0)