Skip to content

Commit 5dd7729

Browse files
committed
fix(feishu): recover WebSocket after SDK retry exhaustion
1 parent f351961 commit 5dd7729

5 files changed

Lines changed: 204 additions & 26 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
3636
- Channels/Telegram: persist native command metadata on target sessions so topic, helper, and ACP-bound slash commands keep their session metadata attached to the routed conversation. (#57548) Thanks @GaosCode.
3737
- Channels/native commands: keep validated native slash command replies visible in group chats while preserving explicit owner allowlists for command authorization. (#73672) Thanks @obviyus.
3838
- Auto-reply/session: carry the tail of user/assistant turns into the freshly-rotated transcript on silent in-reply session resets (compaction failure, role-ordering conflict) so direct-chat continuity survives the rebind. Fixes #70853. (#70898) Thanks @neeravmakwana.
39+
- Feishu: recreate WebSocket clients with monitor-owned backoff after SDK reconnect exhaustion, preserving heartbeat defaults and shutdown cleanup so persistent connections recover without manual gateway restart. Fixes #52618; duplicate evidence #59753; related #55532, #68766, and #72411. Thanks @schumilin, @alex-xuweilong, @120106835, @sirfengyu, and @tianhaocui.
3940

4041
## 2026.4.27
4142

extensions/feishu/src/client.test.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,13 @@ function readCallOptions(
119119
return isRecord(call) ? call : {};
120120
}
121121

122-
function firstWsClientOptions(): { agent?: unknown; wsConfig?: unknown } {
122+
function firstWsClientOptions(): {
123+
agent?: unknown;
124+
wsConfig?: unknown;
125+
onError?: unknown;
126+
} {
123127
const options = readCallOptions(wsClientCtorMock, 0);
124-
return { agent: options.agent, wsConfig: options.wsConfig };
128+
return { agent: options.agent, wsConfig: options.wsConfig, onError: options.onError };
125129
}
126130

127131
beforeAll(async () => {
@@ -355,6 +359,19 @@ describe("createFeishuWSClient proxy handling", () => {
355359
});
356360
});
357361

362+
it("passes lifecycle callbacks while preserving heartbeat wsConfig defaults", async () => {
363+
const onError = vi.fn();
364+
365+
await createFeishuWSClient(baseAccount, { onError });
366+
367+
const options = firstWsClientOptions();
368+
expect(options.onError).toBe(onError);
369+
expect(options.wsConfig).toEqual({
370+
PingInterval: 30,
371+
PingTimeout: 3,
372+
});
373+
});
374+
358375
it("does not set a ws proxy agent when proxy env is absent", async () => {
359376
await createFeishuWSClient(baseAccount);
360377

extensions/feishu/src/client.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,19 @@ export function createFeishuClient(creds: FeishuClientCredentials): Lark.Client
220220
return client;
221221
}
222222

223+
export type FeishuWsClientCallbacks = Pick<
224+
ConstructorParameters<typeof feishuClientSdk.WSClient>[0],
225+
"onError" | "onReady" | "onReconnected" | "onReconnecting"
226+
>;
227+
223228
/**
224229
* Create a Feishu WebSocket client for an account.
225230
* Note: WSClient is not cached since each call creates a new connection.
226231
*/
227-
export async function createFeishuWSClient(account: ResolvedFeishuAccount): Promise<Lark.WSClient> {
232+
export async function createFeishuWSClient(
233+
account: ResolvedFeishuAccount,
234+
callbacks: FeishuWsClientCallbacks = {},
235+
): Promise<Lark.WSClient> {
228236
const { accountId, appId, appSecret, domain } = account;
229237

230238
if (!appId || !appSecret) {
@@ -236,6 +244,7 @@ export async function createFeishuWSClient(account: ResolvedFeishuAccount): Prom
236244
appId,
237245
appSecret,
238246
domain: resolveDomain(domain),
247+
...callbacks,
239248
loggerLevel: feishuClientSdk.LoggerLevel.info,
240249
wsConfig: FEISHU_WS_CONFIG,
241250
...(agent ? { agent } : {}),

extensions/feishu/src/monitor.cleanup.test.ts

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,116 @@ describe("feishu websocket cleanup", () => {
136136
expect(errorMessage).toContain("appSecret=[redacted]");
137137
});
138138

139+
it("recreates the websocket client after sdk reconnect exhaustion", async () => {
140+
vi.useFakeTimers();
141+
const exhaustedClient = createWsClient();
142+
const recoveredClient = createWsClient();
143+
createFeishuWSClientMock
144+
.mockResolvedValueOnce(exhaustedClient)
145+
.mockResolvedValueOnce(recoveredClient);
146+
147+
const abortController = new AbortController();
148+
const runtime = {
149+
log: vi.fn(),
150+
error: vi.fn(),
151+
exit: vi.fn(),
152+
};
153+
const accountId = "exhausted";
154+
botOpenIds.set(accountId, "ou_exhausted");
155+
botNames.set(accountId, "Exhausted");
156+
157+
const monitorPromise = monitorWebSocket({
158+
account: createAccount(accountId),
159+
accountId,
160+
runtime,
161+
abortSignal: abortController.signal,
162+
eventDispatcher: {} as never,
163+
});
164+
165+
await vi.waitFor(() => {
166+
expect(exhaustedClient.start).toHaveBeenCalledTimes(1);
167+
expect(wsClients.get(accountId)).toBe(exhaustedClient);
168+
});
169+
170+
const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as
171+
| { onError?: (err: Error) => void }
172+
| undefined;
173+
callbacks?.onError?.(
174+
new Error("WebSocket reconnect exhausted after 3 attempts\nBearer token_abc"),
175+
);
176+
177+
await vi.waitFor(() => {
178+
expect(exhaustedClient.close).toHaveBeenCalledTimes(1);
179+
expect(wsClients.has(accountId)).toBe(false);
180+
});
181+
expect(botOpenIds.get(accountId)).toBe("ou_exhausted");
182+
expect(botNames.get(accountId)).toBe("Exhausted");
183+
184+
await vi.advanceTimersByTimeAsync(1_000);
185+
186+
await vi.waitFor(() => {
187+
expect(recoveredClient.start).toHaveBeenCalledTimes(1);
188+
expect(wsClients.get(accountId)).toBe(recoveredClient);
189+
});
190+
191+
abortController.abort();
192+
await monitorPromise;
193+
194+
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(2);
195+
expect(recoveredClient.close).toHaveBeenCalledTimes(1);
196+
expect(botOpenIds.has(accountId)).toBe(false);
197+
expect(botNames.has(accountId)).toBe(false);
198+
const errorMessage = String(runtime.error.mock.calls[0]?.[0] ?? "");
199+
expect(errorMessage).toContain("WebSocket connection ended, recreating client in 1000ms");
200+
expect(errorMessage).toContain("Bearer [redacted]");
201+
expect(errorMessage).not.toContain("\n");
202+
expect(errorMessage).not.toContain("token_abc");
203+
});
204+
205+
it("clears identity without recreating a websocket when aborted during reconnect backoff", async () => {
206+
vi.useFakeTimers();
207+
const exhaustedClient = createWsClient();
208+
createFeishuWSClientMock.mockResolvedValueOnce(exhaustedClient);
209+
210+
const abortController = new AbortController();
211+
const accountId = "abort-backoff";
212+
botOpenIds.set(accountId, "ou_abort");
213+
botNames.set(accountId, "Abort");
214+
215+
const monitorPromise = monitorWebSocket({
216+
account: createAccount(accountId),
217+
accountId,
218+
runtime: {
219+
log: vi.fn(),
220+
error: vi.fn(),
221+
exit: vi.fn(),
222+
},
223+
abortSignal: abortController.signal,
224+
eventDispatcher: {} as never,
225+
});
226+
227+
await vi.waitFor(() => {
228+
expect(exhaustedClient.start).toHaveBeenCalledTimes(1);
229+
});
230+
231+
const callbacks = createFeishuWSClientMock.mock.calls[0]?.[1] as
232+
| { onError?: (err: Error) => void }
233+
| undefined;
234+
callbacks?.onError?.(new Error("WebSocket reconnect exhausted after 3 attempts"));
235+
236+
await vi.waitFor(() => {
237+
expect(exhaustedClient.close).toHaveBeenCalledTimes(1);
238+
});
239+
240+
abortController.abort();
241+
await monitorPromise;
242+
243+
expect(createFeishuWSClientMock).toHaveBeenCalledTimes(1);
244+
expect(wsClients.has(accountId)).toBe(false);
245+
expect(botOpenIds.has(accountId)).toBe(false);
246+
expect(botNames.has(accountId)).toBe(false);
247+
});
248+
139249
it("redacts websocket close errors during abort cleanup", async () => {
140250
const wsClient = createWsClient();
141251
wsClient.close.mockImplementationOnce(() => {

extensions/feishu/src/monitor.transport.ts

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ function cleanupFeishuWsClient(params: {
124124
accountId: string;
125125
wsClient?: Lark.WSClient;
126126
error: (message: string) => void;
127+
clearIdentity: boolean;
127128
}): void {
128-
const { accountId, wsClient, error } = params;
129+
const { accountId, wsClient, error, clearIdentity } = params;
129130
if (wsClient) {
130131
try {
131132
wsClient.close();
@@ -136,27 +137,43 @@ function cleanupFeishuWsClient(params: {
136137
}
137138
}
138139
wsClients.delete(accountId);
139-
botOpenIds.delete(accountId);
140-
botNames.delete(accountId);
140+
if (clearIdentity) {
141+
botOpenIds.delete(accountId);
142+
botNames.delete(accountId);
143+
}
141144
}
142145

143-
function waitForFeishuWsAbort(abortSignal?: AbortSignal): Promise<void> {
144-
if (abortSignal?.aborted) {
145-
return Promise.resolve();
146+
function waitForFeishuWsCycleEnd(params: {
147+
abortSignal?: AbortSignal;
148+
terminalError: Promise<Error>;
149+
}): Promise<"abort" | Error> {
150+
if (params.abortSignal?.aborted) {
151+
return Promise.resolve("abort");
146152
}
153+
147154
return new Promise((resolve) => {
148-
if (!abortSignal) {
149-
// No external lifecycle owner was provided, so keep the SDK-managed connection alive.
150-
return;
151-
}
152-
const handleAbort = () => {
153-
abortSignal.removeEventListener("abort", handleAbort);
154-
resolve();
155+
let settled = false;
156+
let handleAbort: (() => void) | undefined;
157+
158+
const finish = (result: "abort" | Error) => {
159+
if (settled) {
160+
return;
161+
}
162+
settled = true;
163+
if (handleAbort) {
164+
params.abortSignal?.removeEventListener("abort", handleAbort);
165+
}
166+
resolve(result);
155167
};
156-
abortSignal.addEventListener("abort", handleAbort, { once: true });
157-
if (abortSignal.aborted) {
158-
handleAbort();
168+
169+
handleAbort = () => finish("abort");
170+
params.abortSignal?.addEventListener("abort", handleAbort, { once: true });
171+
if (params.abortSignal?.aborted) {
172+
finish("abort");
173+
return;
159174
}
175+
176+
void params.terminalError.then(finish);
160177
});
161178
}
162179

@@ -178,22 +195,45 @@ export async function monitorWebSocket({
178195

179196
let wsClient: Lark.WSClient | undefined;
180197
try {
198+
let reportTerminalError: (err: Error) => void = () => {};
199+
const terminalError = new Promise<Error>((resolve) => {
200+
reportTerminalError = resolve;
201+
});
181202
log(`feishu[${accountId}]: starting WebSocket connection...`);
182-
wsClient = await createFeishuWSClient(account);
203+
wsClient = await createFeishuWSClient(account, {
204+
onError: reportTerminalError,
205+
});
183206
if (abortSignal?.aborted) {
184-
cleanupFeishuWsClient({ accountId, wsClient, error });
207+
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true });
185208
break;
186209
}
187210
wsClients.set(accountId, wsClient);
188211
await wsClient.start({ eventDispatcher });
189212
attempt = 0;
190213
log(`feishu[${accountId}]: WebSocket client started`);
191-
await waitForFeishuWsAbort(abortSignal);
192-
log(`feishu[${accountId}]: abort signal received, stopping`);
193-
cleanupFeishuWsClient({ accountId, wsClient, error });
194-
return;
214+
const cycleEnd = await waitForFeishuWsCycleEnd({ abortSignal, terminalError });
215+
if (cycleEnd === "abort") {
216+
log(`feishu[${accountId}]: abort signal received, stopping`);
217+
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: true });
218+
return;
219+
}
220+
221+
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false });
222+
if (abortSignal?.aborted) {
223+
break;
224+
}
225+
226+
attempt += 1;
227+
const delayMs = getFeishuWsReconnectDelayMs(attempt);
228+
error(
229+
`feishu[${accountId}]: WebSocket connection ended, recreating client in ${delayMs}ms: ${formatFeishuWsErrorForLog(cycleEnd)}`,
230+
);
231+
const shouldRetry = await waitForAbortableDelay(delayMs, abortSignal);
232+
if (!shouldRetry) {
233+
break;
234+
}
195235
} catch (err) {
196-
cleanupFeishuWsClient({ accountId, wsClient, error });
236+
cleanupFeishuWsClient({ accountId, wsClient, error, clearIdentity: false });
197237
if (abortSignal?.aborted) {
198238
break;
199239
}
@@ -209,6 +249,7 @@ export async function monitorWebSocket({
209249
}
210250
}
211251
}
252+
cleanupFeishuWsClient({ accountId, wsClient: undefined, error, clearIdentity: true });
212253
}
213254

214255
export async function monitorWebhook({

0 commit comments

Comments
 (0)