Skip to content

Commit ec1f72b

Browse files
authored
fix(gateway): preserve restart drain for active runs
Fixes #65485
1 parent 734748d commit ec1f72b

17 files changed

Lines changed: 453 additions & 64 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ Docs: https://docs.openclaw.ai
131131
- macOS Gateway: wait for launchd to reload the exited Gateway LaunchAgent before bootstrapping repair fallback, preventing config-triggered restarts from leaving the service not loaded. Fixes #45178. Thanks @vincentkoc.
132132
- macOS Gateway: tolerate launchctl bootstrap's already-loaded exit during restart fallback and use non-killing kickstart after bootstrap, avoiding a second race that can unload the LaunchAgent. Fixes #41934. Thanks @zerone0x.
133133
- macOS Gateway: rewrite stale LaunchAgent plists before restart fallback bootstrap, matching install repair behavior when `gateway restart` has to re-register launchd. Thanks @maybegeeker.
134+
- Gateway/reload: wait indefinitely by default for active operations before restart-required reloads and preserve that drain path when service-manager restarts arrive as SIGTERM, preventing in-flight agent runs from being killed mid-turn. Fixes #65485. Thanks @rijhsinghani.
134135
- TTS/hooks: preserve audio-only TTS transcripts for `message_sending` and `message_sent` hooks without rendering the transcript as a media caption. Thanks @zqchris.
135136
- WhatsApp/TTS: preserve `audioAsVoice` through shared media payload sends and the WhatsApp outbound adapter, so `[[audio_as_voice]]` reply payloads keep their voice-note intent when routed through `sendPayload`. Fixes #66053. Thanks @masatohoshino.
136137
- Control UI/WebChat: hide heartbeat prompts, `HEARTBEAT_OK` acknowledgments, and internal-only runtime context turns from visible chat history while leaving the underlying transcript intact. Fixes #71381. Thanks @gerald1950ggg-ai.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
83677b2666da2169511e5372f26c20c794001ec8acc7e9c2e1935043010c05d6 config-baseline.json
2-
fa38a1bde88d8858ae0a11e7e17fa42fe107c34268b568f51877afbde81922e8 config-baseline.core.json
1+
dae9ece3ac683a0bed2835d96d4373f65ab955b8b901df0bcdeedc565ade6ed6 config-baseline.json
2+
7cd52f77b1e0ecb50d2119b4c21d6d51d336a0c752a44cbaf8df1efa9ef538c0 config-baseline.core.json
33
d72032762ab46b99480b57deb81130a0ab5b1401189cfbaf4f7fef4a063a7f6c config-baseline.channel.json
44
0504c4f38d4c753fffeb465c93540d829df6b0fcef921eb0e2226ac16bdbbe07 config-baseline.plugin.json

docs/gateway/configuration-reference.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ See [Multiple Gateways](/gateway/multiple-gateways).
472472
reload: {
473473
mode: "hybrid", // off | restart | hot | hybrid
474474
debounceMs: 500,
475-
deferralTimeoutMs: 300000,
475+
deferralTimeoutMs: 0,
476476
},
477477
},
478478
}
@@ -484,7 +484,7 @@ See [Multiple Gateways](/gateway/multiple-gateways).
484484
- `"hot"`: apply changes in-process without restarting.
485485
- `"hybrid"` (default): try hot reload first; fall back to restart if required.
486486
- `debounceMs`: debounce window in ms before config changes are applied (non-negative integer).
487-
- `deferralTimeoutMs`: maximum time in ms to wait for in-flight operations before forcing a restart (default: `300000` = 5 minutes).
487+
- `deferralTimeoutMs`: optional maximum time in ms to wait for in-flight operations before forcing a restart. Omit it or set `0` to wait indefinitely and log periodic still-pending warnings.
488488

489489
---
490490

src/agents/pi-embedded-runner/runs.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,24 +273,30 @@ export function consumeEmbeddedRunModelSwitch(
273273
/**
274274
* Wait for active embedded runs to drain.
275275
*
276-
* Used during restarts so in-flight compaction runs can release session write
277-
* locks before the next lifecycle starts.
276+
* Used during restarts so in-flight runs can release session write locks before
277+
* the next lifecycle starts. If no timeout is passed, waits indefinitely.
278278
*/
279279
export async function waitForActiveEmbeddedRuns(
280-
timeoutMs = 15_000,
280+
timeoutMs?: number,
281281
opts?: { pollMs?: number },
282282
): Promise<{ drained: boolean }> {
283283
const pollMsRaw = opts?.pollMs ?? 250;
284284
const pollMs = Math.max(10, Math.floor(pollMsRaw));
285-
const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs));
285+
if (timeoutMs !== undefined && timeoutMs <= 0) {
286+
return { drained: getActiveEmbeddedRunCount() === 0 };
287+
}
288+
const maxWaitMs =
289+
typeof timeoutMs === "number" && Number.isFinite(timeoutMs)
290+
? Math.max(pollMs, Math.floor(timeoutMs))
291+
: undefined;
286292

287293
const startedAt = Date.now();
288294
while (true) {
289295
if (getActiveEmbeddedRunCount() === 0) {
290296
return { drained: true };
291297
}
292298
const elapsedMs = Date.now() - startedAt;
293-
if (elapsedMs >= maxWaitMs) {
299+
if (maxWaitMs !== undefined && elapsedMs >= maxWaitMs) {
294300
diag.warn(
295301
`wait for active embedded runs timed out: activeRuns=${getActiveEmbeddedRunCount()} timeoutMs=${maxWaitMs}`,
296302
);

src/cli/daemon-cli/lifecycle-core.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ const loadConfig = vi.fn<() => OpenClawConfig>(() => ({
1616
},
1717
},
1818
}));
19+
const writeGatewayRestartIntentSync = vi.fn();
20+
const clearGatewayRestartIntentSync = vi.fn();
1921

2022
vi.mock("../../config/config.js", () => ({
2123
loadConfig: () => loadConfig(),
@@ -26,6 +28,11 @@ vi.mock("../../runtime.js", () => ({
2628
defaultRuntime,
2729
}));
2830

31+
vi.mock("../../infra/restart.js", () => ({
32+
clearGatewayRestartIntentSync: () => clearGatewayRestartIntentSync(),
33+
writeGatewayRestartIntentSync: (opts: unknown) => writeGatewayRestartIntentSync(opts),
34+
}));
35+
2936
let runServiceRestart: typeof import("./lifecycle-core.js").runServiceRestart;
3037
let runServiceStart: typeof import("./lifecycle-core.js").runServiceStart;
3138
let runServiceStop: typeof import("./lifecycle-core.js").runServiceStop;
@@ -92,6 +99,8 @@ describe("runServiceRestart token drift", () => {
9299
},
93100
});
94101
resetLifecycleServiceMocks();
102+
writeGatewayRestartIntentSync.mockClear();
103+
clearGatewayRestartIntentSync.mockClear();
95104
service.readCommand.mockResolvedValue({
96105
programArguments: [],
97106
environment: { OPENCLAW_GATEWAY_TOKEN: "service-token" },
@@ -182,6 +191,7 @@ describe("runServiceRestart token drift", () => {
182191

183192
expect(loadConfig).not.toHaveBeenCalled();
184193
expect(service.readCommand).not.toHaveBeenCalled();
194+
expect(writeGatewayRestartIntentSync).not.toHaveBeenCalled();
185195
const payload = readJsonLog<{ warnings?: string[] }>();
186196
expect(payload.warnings).toBeUndefined();
187197
});
@@ -305,6 +315,27 @@ describe("runServiceRestart token drift", () => {
305315
expect(payload.message).toBe("restart scheduled, gateway will restart momentarily");
306316
});
307317

318+
it("writes a restart intent before service-manager restart", async () => {
319+
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
320+
321+
await runServiceRestart(createServiceRunArgs());
322+
323+
expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 });
324+
expect(clearGatewayRestartIntentSync).not.toHaveBeenCalled();
325+
expect(service.restart).toHaveBeenCalledTimes(1);
326+
});
327+
328+
it("clears restart intent when service-manager restart fails before signaling", async () => {
329+
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
330+
writeGatewayRestartIntentSync.mockReturnValueOnce(true);
331+
service.restart.mockRejectedValueOnce(new Error("launchctl failed before signaling"));
332+
333+
await expect(runServiceRestart(createServiceRunArgs())).rejects.toThrow("__exit__:1");
334+
335+
expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 });
336+
expect(clearGatewayRestartIntentSync).toHaveBeenCalledOnce();
337+
});
338+
308339
it("emits scheduled when service start routes through a scheduled restart", async () => {
309340
service.restart.mockResolvedValue({ outcome: "scheduled" });
310341

src/cli/daemon-cli/lifecycle-core.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import type { GatewayService } from "../../daemon/service.js";
99
import { renderSystemdUnavailableHints } from "../../daemon/systemd-hints.js";
1010
import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js";
1111
import { isGatewaySecretRefUnavailableError } from "../../gateway/credentials.js";
12+
import {
13+
clearGatewayRestartIntentSync,
14+
writeGatewayRestartIntentSync,
15+
} from "../../infra/restart.js";
1216
import { isWSL } from "../../infra/wsl.js";
1317
import { defaultRuntime } from "../../runtime.js";
1418
import { resolveGatewayTokenForDriftCheck } from "./gateway-token-drift.js";
@@ -458,7 +462,21 @@ export async function runServiceRestart(params: {
458462
try {
459463
let restartResult: GatewayServiceRestartResult = { outcome: "completed" };
460464
if (loaded) {
461-
restartResult = await params.service.restart({ env: process.env, stdout });
465+
let wroteRestartIntent = false;
466+
if (params.serviceNoun === "Gateway") {
467+
const runtime = await params.service.readRuntime(process.env).catch(() => null);
468+
wroteRestartIntent = writeGatewayRestartIntentSync({
469+
targetPid: runtime?.pid,
470+
});
471+
}
472+
try {
473+
restartResult = await params.service.restart({ env: process.env, stdout });
474+
} catch (err) {
475+
if (wroteRestartIntent) {
476+
clearGatewayRestartIntentSync();
477+
}
478+
throw err;
479+
}
462480
}
463481
let restartStatus = describeGatewayServiceRestart(params.serviceNoun, restartResult);
464482
if (restartStatus.scheduled) {

src/cli/gateway-cli/run-loop.test.ts

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({
66
release: vi.fn(async () => {}),
77
}));
88
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
9+
const consumeGatewayRestartIntentSync = vi.fn(() => false);
910
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
1011
const markGatewaySigusr1RestartHandled = vi.fn();
1112
const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({
@@ -19,7 +20,7 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?
1920
}));
2021
const getActiveTaskCount = vi.fn(() => 0);
2122
const markGatewayDraining = vi.fn();
22-
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
23+
const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
2324
const resetAllLanes = vi.fn();
2425
const restartGatewayProcessWithFreshPid = vi.fn<
2526
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
@@ -28,7 +29,7 @@ const abortEmbeddedPiRun = vi.fn(
2829
(_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false,
2930
);
3031
const getActiveEmbeddedRunCount = vi.fn(() => 0);
31-
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
32+
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
3233
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
3334
const loadConfig = vi.fn(() => ({
3435
gateway: {
@@ -49,6 +50,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({
4950

5051
vi.mock("../../infra/restart.js", () => ({
5152
consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(),
53+
consumeGatewayRestartIntentSync: () => consumeGatewayRestartIntentSync(),
5254
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
5355
markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(),
5456
scheduleGatewaySigusr1Restart: (opts?: { delayMs?: number; reason?: string }) =>
@@ -62,15 +64,15 @@ vi.mock("../../infra/process-respawn.js", () => ({
6264
vi.mock("../../process/command-queue.js", () => ({
6365
getActiveTaskCount: () => getActiveTaskCount(),
6466
markGatewayDraining: () => markGatewayDraining(),
65-
waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs),
67+
waitForActiveTasks: (timeoutMs?: number) => waitForActiveTasks(timeoutMs),
6668
resetAllLanes: () => resetAllLanes(),
6769
}));
6870

6971
vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
7072
abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) =>
7173
abortEmbeddedPiRun(sessionId, opts),
7274
getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(),
73-
waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs),
75+
waitForActiveEmbeddedRuns: (timeoutMs?: number) => waitForActiveEmbeddedRuns(timeoutMs),
7476
}));
7577

7678
vi.mock("../../config/config.js", () => ({
@@ -226,6 +228,50 @@ describe("runGatewayLoop", () => {
226228
});
227229
});
228230

231+
it("treats SIGTERM with a restart intent as a draining restart", async () => {
232+
vi.clearAllMocks();
233+
consumeGatewayRestartIntentSync.mockReturnValueOnce(true);
234+
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
235+
236+
await withIsolatedSignals(async ({ captureSignal }) => {
237+
const closeFirst = vi.fn(async () => {});
238+
const closeSecond = vi.fn(async () => {});
239+
const { runtime, exited } = createRuntimeWithExitSignal();
240+
const start = vi
241+
.fn()
242+
.mockResolvedValueOnce({ close: closeFirst })
243+
.mockResolvedValueOnce({ close: closeSecond });
244+
const { runGatewayLoop } = await import("./run-loop.js");
245+
void runGatewayLoop({
246+
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
247+
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
248+
});
249+
await new Promise<void>((resolve) => setImmediate(resolve));
250+
const sigterm = captureSignal("SIGTERM");
251+
const sigint = captureSignal("SIGINT");
252+
253+
sigterm();
254+
await new Promise<void>((resolve) => setImmediate(resolve));
255+
await new Promise<void>((resolve) => setImmediate(resolve));
256+
257+
expect(consumeGatewayRestartIntentSync).toHaveBeenCalledOnce();
258+
expect(markGatewayDraining).toHaveBeenCalledOnce();
259+
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
260+
expect(closeFirst).toHaveBeenCalledWith({
261+
reason: "gateway restarting",
262+
restartExpectedMs: 1500,
263+
});
264+
expect(start).toHaveBeenCalledTimes(2);
265+
266+
sigint();
267+
await expect(exited).resolves.toBe(0);
268+
expect(closeSecond).toHaveBeenCalledWith({
269+
reason: "gateway stopping",
270+
restartExpectedMs: null,
271+
});
272+
});
273+
});
274+
229275
it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => {
230276
vi.clearAllMocks();
231277
loadConfig.mockReturnValue({

0 commit comments

Comments
 (0)