Skip to content

Commit e847319

Browse files
fix: improve discord chunk delivery (#33226) (thanks @thewilloftheshadow)
1 parent d89e1e4 commit e847319

File tree

3 files changed

+243
-13
lines changed

3 files changed

+243
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
1515
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
1616
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.
1717
- Discord/channel resolution: default bare numeric recipients to channels, harden allowlist numeric ID handling with safe fallbacks, and avoid inbound WS heartbeat stalls. (#33142) Thanks @thewilloftheshadow.
18+
- Discord/chunk delivery reliability: preserve chunk ordering when using a REST client and retry chunk sends on 429/5xx using account retry settings. (#33226) Thanks @thewilloftheshadow.
1819
- Exec heartbeat routing: scope exec-triggered heartbeat wakes to agent session keys so unrelated agents are no longer awakened by exec events, while preserving legacy unscoped behavior for non-canonical session keys. (#32724) thanks @altaywtf
1920
- macOS/Tailscale remote gateway discovery: add a Tailscale Serve fallback peer probe path (`wss://<peer>.ts.net`) when Bonjour and wide-area DNS-SD discovery return no gateways, and refresh both discovery paths from macOS onboarding. (#32860) Thanks @ngutman.
2021
- Telegram/multi-account default routing clarity: warn only for ambiguous (2+) account setups without an explicit default, add `openclaw doctor` warnings for missing/invalid multi-account defaults across channels, and document explicit-default guidance for channel routing and Telegram config. (#32544) thanks @Sid-Qin.

src/discord/monitor/reply-delivery.test.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,18 @@ import {
99
const sendMessageDiscordMock = vi.hoisted(() => vi.fn());
1010
const sendVoiceMessageDiscordMock = vi.hoisted(() => vi.fn());
1111
const sendWebhookMessageDiscordMock = vi.hoisted(() => vi.fn());
12+
const sendDiscordTextMock = vi.hoisted(() => vi.fn());
1213

1314
vi.mock("../send.js", () => ({
1415
sendMessageDiscord: (...args: unknown[]) => sendMessageDiscordMock(...args),
1516
sendVoiceMessageDiscord: (...args: unknown[]) => sendVoiceMessageDiscordMock(...args),
1617
sendWebhookMessageDiscord: (...args: unknown[]) => sendWebhookMessageDiscordMock(...args),
1718
}));
1819

20+
vi.mock("../send.shared.js", () => ({
21+
sendDiscordText: (...args: unknown[]) => sendDiscordTextMock(...args),
22+
}));
23+
1924
describe("deliverDiscordReply", () => {
2025
const runtime = {} as RuntimeEnv;
2126
const createBoundThreadBindings = async (
@@ -62,6 +67,10 @@ describe("deliverDiscordReply", () => {
6267
messageId: "webhook-1",
6368
channelId: "thread-1",
6469
});
70+
sendDiscordTextMock.mockClear().mockResolvedValue({
71+
id: "msg-direct-1",
72+
channel_id: "channel-1",
73+
});
6574
threadBindingTesting.resetThreadBindingsForTests();
6675
});
6776

@@ -182,6 +191,131 @@ describe("deliverDiscordReply", () => {
182191
);
183192
});
184193

194+
it("sends text chunks in order via sendDiscordText when rest is provided", async () => {
195+
const fakeRest = {} as import("@buape/carbon").RequestClient;
196+
const callOrder: string[] = [];
197+
sendDiscordTextMock.mockImplementation(
198+
async (_rest: unknown, _channelId: unknown, text: string) => {
199+
callOrder.push(text);
200+
return { id: `msg-${callOrder.length}`, channel_id: "789" };
201+
},
202+
);
203+
204+
await deliverDiscordReply({
205+
replies: [{ text: "1234567890" }],
206+
target: "channel:789",
207+
token: "token",
208+
rest: fakeRest,
209+
runtime,
210+
textLimit: 5,
211+
});
212+
213+
expect(sendMessageDiscordMock).not.toHaveBeenCalled();
214+
expect(sendDiscordTextMock).toHaveBeenCalledTimes(2);
215+
expect(callOrder).toEqual(["12345", "67890"]);
216+
expect(sendDiscordTextMock.mock.calls[0]?.[1]).toBe("789");
217+
expect(sendDiscordTextMock.mock.calls[1]?.[1]).toBe("789");
218+
});
219+
220+
it("falls back to sendMessageDiscord when rest is not provided", async () => {
221+
await deliverDiscordReply({
222+
replies: [{ text: "single chunk" }],
223+
target: "channel:789",
224+
token: "token",
225+
runtime,
226+
textLimit: 2000,
227+
});
228+
229+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1);
230+
expect(sendDiscordTextMock).not.toHaveBeenCalled();
231+
});
232+
233+
it("retries bot send on 429 rate limit then succeeds", async () => {
234+
const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 });
235+
sendMessageDiscordMock
236+
.mockRejectedValueOnce(rateLimitErr)
237+
.mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" });
238+
239+
await deliverDiscordReply({
240+
replies: [{ text: "retry me" }],
241+
target: "channel:123",
242+
token: "token",
243+
runtime,
244+
textLimit: 2000,
245+
});
246+
247+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2);
248+
});
249+
250+
it("retries bot send on 500 server error then succeeds", async () => {
251+
const serverErr = Object.assign(new Error("internal"), { status: 500 });
252+
sendMessageDiscordMock
253+
.mockRejectedValueOnce(serverErr)
254+
.mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" });
255+
256+
await deliverDiscordReply({
257+
replies: [{ text: "retry me" }],
258+
target: "channel:123",
259+
token: "token",
260+
runtime,
261+
textLimit: 2000,
262+
});
263+
264+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2);
265+
});
266+
267+
it("does not retry on 4xx client errors", async () => {
268+
const clientErr = Object.assign(new Error("bad request"), { status: 400 });
269+
sendMessageDiscordMock.mockRejectedValueOnce(clientErr);
270+
271+
await expect(
272+
deliverDiscordReply({
273+
replies: [{ text: "fail" }],
274+
target: "channel:123",
275+
token: "token",
276+
runtime,
277+
textLimit: 2000,
278+
}),
279+
).rejects.toThrow("bad request");
280+
281+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1);
282+
});
283+
284+
it("throws after exhausting retry attempts", async () => {
285+
const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 });
286+
sendMessageDiscordMock.mockRejectedValue(rateLimitErr);
287+
288+
await expect(
289+
deliverDiscordReply({
290+
replies: [{ text: "persistent failure" }],
291+
target: "channel:123",
292+
token: "token",
293+
runtime,
294+
textLimit: 2000,
295+
}),
296+
).rejects.toThrow("rate limited");
297+
298+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(3);
299+
});
300+
301+
it("delivers remaining chunks after a mid-sequence retry", async () => {
302+
sendMessageDiscordMock
303+
.mockResolvedValueOnce({ messageId: "c1" })
304+
.mockRejectedValueOnce(Object.assign(new Error("rate limited"), { status: 429 }))
305+
.mockResolvedValueOnce({ messageId: "c2-retry" })
306+
.mockResolvedValueOnce({ messageId: "c3" });
307+
308+
await deliverDiscordReply({
309+
replies: [{ text: "A".repeat(6) }],
310+
target: "channel:123",
311+
token: "token",
312+
runtime,
313+
textLimit: 2,
314+
});
315+
316+
expect(sendMessageDiscordMock).toHaveBeenCalledTimes(4);
317+
});
318+
185319
it("sends bound-session text replies through webhook delivery", async () => {
186320
const threadBindings = await createBoundThreadBindings({ label: "codex-refactor" });
187321

src/discord/monitor/reply-delivery.ts

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import type { ChunkMode } from "../../auto-reply/chunk.js";
44
import type { ReplyPayload } from "../../auto-reply/types.js";
55
import { loadConfig } from "../../config/config.js";
66
import type { MarkdownTableMode, ReplyToMode } from "../../config/types.base.js";
7+
import { createDiscordRetryRunner, type RetryRunner } from "../../infra/retry-policy.js";
8+
import { resolveRetryConfig, retryAsync, type RetryConfig } from "../../infra/retry.js";
79
import { convertMarkdownTables } from "../../markdown/tables.js";
810
import type { RuntimeEnv } from "../../runtime.js";
11+
import { resolveDiscordAccount } from "../accounts.js";
912
import { chunkDiscordTextWithMode } from "../chunk.js";
1013
import { sendMessageDiscord, sendVoiceMessageDiscord, sendWebhookMessageDiscord } from "../send.js";
14+
import { sendDiscordText } from "../send.shared.js";
1115

1216
export type DiscordThreadBindingLookupRecord = {
1317
accountId: string;
@@ -23,6 +27,54 @@ export type DiscordThreadBindingLookup = {
2327
touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown;
2428
};
2529

30+
type ResolvedRetryConfig = Required<RetryConfig>;
31+
32+
const DISCORD_DELIVERY_RETRY_DEFAULTS: ResolvedRetryConfig = {
33+
attempts: 3,
34+
minDelayMs: 1000,
35+
maxDelayMs: 30_000,
36+
jitter: 0,
37+
};
38+
39+
function isRetryableDiscordError(err: unknown): boolean {
40+
const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode;
41+
return status === 429 || (status !== undefined && status >= 500);
42+
}
43+
44+
function getDiscordRetryAfterMs(err: unknown): number | undefined {
45+
if (!err || typeof err !== "object") {
46+
return undefined;
47+
}
48+
if (
49+
"retryAfter" in err &&
50+
typeof err.retryAfter === "number" &&
51+
Number.isFinite(err.retryAfter)
52+
) {
53+
return err.retryAfter * 1000;
54+
}
55+
const retryAfterRaw = (err as { headers?: Record<string, string> }).headers?.["retry-after"];
56+
if (!retryAfterRaw) {
57+
return undefined;
58+
}
59+
const retryAfterMs = Number(retryAfterRaw) * 1000;
60+
return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined;
61+
}
62+
63+
function resolveDeliveryRetryConfig(retry?: RetryConfig): ResolvedRetryConfig {
64+
return resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, retry);
65+
}
66+
67+
async function sendWithRetry(
68+
fn: () => Promise<unknown>,
69+
retryConfig: ResolvedRetryConfig,
70+
): Promise<void> {
71+
await retryAsync(fn, {
72+
...retryConfig,
73+
shouldRetry: (err) => isRetryableDiscordError(err),
74+
retryAfterMs: getDiscordRetryAfterMs,
75+
});
76+
}
77+
2678
function resolveTargetChannelId(target: string): string | undefined {
2779
if (!target.startsWith("channel:")) {
2880
return undefined;
@@ -83,6 +135,12 @@ async function sendDiscordChunkWithFallback(params: {
83135
binding?: DiscordThreadBindingLookupRecord;
84136
username?: string;
85137
avatarUrl?: string;
138+
/** Pre-resolved channel ID to bypass redundant resolution per chunk. */
139+
channelId?: string;
140+
/** Pre-created retry runner to avoid creating one per chunk. */
141+
request?: RetryRunner;
142+
/** Pre-resolved retry config (account-level). */
143+
retryConfig: ResolvedRetryConfig;
86144
}) {
87145
if (!params.text.trim()) {
88146
return;
@@ -105,12 +163,27 @@ async function sendDiscordChunkWithFallback(params: {
105163
// Fall through to the standard bot sender path.
106164
}
107165
}
108-
await sendMessageDiscord(params.target, text, {
109-
token: params.token,
110-
rest: params.rest,
111-
accountId: params.accountId,
112-
replyTo: params.replyTo,
113-
});
166+
// When channelId and request are pre-resolved, send directly via sendDiscordText
167+
// to avoid per-chunk overhead (channel-type GET, re-chunking, client creation)
168+
// that can cause ordering issues under queue contention or rate limiting.
169+
if (params.channelId && params.request && params.rest) {
170+
const { channelId, request, rest } = params;
171+
await sendWithRetry(
172+
() => sendDiscordText(rest, channelId, text, params.replyTo, request),
173+
params.retryConfig,
174+
);
175+
return;
176+
}
177+
await sendWithRetry(
178+
() =>
179+
sendMessageDiscord(params.target, text, {
180+
token: params.token,
181+
rest: params.rest,
182+
accountId: params.accountId,
183+
replyTo: params.replyTo,
184+
}),
185+
params.retryConfig,
186+
);
114187
}
115188

116189
async function sendAdditionalDiscordMedia(params: {
@@ -120,16 +193,21 @@ async function sendAdditionalDiscordMedia(params: {
120193
accountId?: string;
121194
mediaUrls: string[];
122195
resolveReplyTo: () => string | undefined;
196+
retryConfig: ResolvedRetryConfig;
123197
}) {
124198
for (const mediaUrl of params.mediaUrls) {
125199
const replyTo = params.resolveReplyTo();
126-
await sendMessageDiscord(params.target, "", {
127-
token: params.token,
128-
rest: params.rest,
129-
mediaUrl,
130-
accountId: params.accountId,
131-
replyTo,
132-
});
200+
await sendWithRetry(
201+
() =>
202+
sendMessageDiscord(params.target, "", {
203+
token: params.token,
204+
rest: params.rest,
205+
mediaUrl,
206+
accountId: params.accountId,
207+
replyTo,
208+
}),
209+
params.retryConfig,
210+
);
133211
}
134212
}
135213

@@ -174,6 +252,15 @@ export async function deliverDiscordReply(params: {
174252
target: params.target,
175253
});
176254
const persona = resolveBindingPersona(binding);
255+
// Pre-resolve channel ID and retry runner once to avoid per-chunk overhead.
256+
// This eliminates redundant channel-type GET requests and client creation that
257+
// can cause ordering issues when multiple chunks share the RequestClient queue.
258+
const channelId = resolveTargetChannelId(params.target);
259+
const account = resolveDiscordAccount({ cfg: loadConfig(), accountId: params.accountId });
260+
const retryConfig = resolveDeliveryRetryConfig(account.config.retry);
261+
const request: RetryRunner | undefined = channelId
262+
? createDiscordRetryRunner({ configRetry: account.config.retry })
263+
: undefined;
177264
let deliveredAny = false;
178265
for (const payload of params.replies) {
179266
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
@@ -208,6 +295,9 @@ export async function deliverDiscordReply(params: {
208295
binding,
209296
username: persona.username,
210297
avatarUrl: persona.avatarUrl,
298+
channelId,
299+
request,
300+
retryConfig,
211301
});
212302
deliveredAny = true;
213303
}
@@ -240,6 +330,9 @@ export async function deliverDiscordReply(params: {
240330
binding,
241331
username: persona.username,
242332
avatarUrl: persona.avatarUrl,
333+
channelId,
334+
request,
335+
retryConfig,
243336
});
244337
// Additional media items are sent as regular attachments (voice is single-file only).
245338
await sendAdditionalDiscordMedia({
@@ -249,6 +342,7 @@ export async function deliverDiscordReply(params: {
249342
accountId: params.accountId,
250343
mediaUrls: mediaList.slice(1),
251344
resolveReplyTo,
345+
retryConfig,
252346
});
253347
continue;
254348
}
@@ -269,6 +363,7 @@ export async function deliverDiscordReply(params: {
269363
accountId: params.accountId,
270364
mediaUrls: mediaList.slice(1),
271365
resolveReplyTo,
366+
retryConfig,
272367
});
273368
}
274369

0 commit comments

Comments
 (0)