Skip to content

Commit 468621f

Browse files
committed
Reapply "fix(core): prevent phantom connections and dead polling in plugin workers (#34823)"
This reverts commit 3a264f4.
1 parent c9bcccb commit 468621f

4 files changed

Lines changed: 162 additions & 82 deletions

File tree

packages/nx/src/daemon/client/client.ts

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ import {
2828
import { preventRecursionInGraphConstruction } from '../../project-graph/project-graph';
2929
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration/source-maps';
3030
import { isJsonMessage } from '../../utils/consume-messages-from-socket';
31+
import { waitForSocketConnection } from '../../utils/wait-for-socket-connection';
3132
import { DelayedSpinner } from '../../utils/delayed-spinner';
33+
import { handleImport } from '../../utils/handle-import';
3234
import { isCI } from '../../utils/is-ci';
3335
import { isSandbox } from '../../utils/is-sandbox';
3436
import { output } from '../../utils/output';
@@ -41,6 +43,13 @@ import { workspaceRoot } from '../../utils/workspace-root';
4143
import { getDaemonProcessIdSync, readDaemonProcessJsonCache } from '../cache';
4244
import { isNxVersionMismatch } from '../is-nx-version-mismatch';
4345
import { clientLogger } from '../logger';
46+
import {
47+
type ConfigureAiAgentsStatusResponse,
48+
GET_CONFIGURE_AI_AGENTS_STATUS,
49+
type HandleGetConfigureAiAgentsStatusMessage,
50+
type HandleResetConfigureAiAgentsStatusMessage,
51+
RESET_CONFIGURE_AI_AGENTS_STATUS,
52+
} from '../message-types/configure-ai-agents';
4453
import {
4554
FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK,
4655
type HandleFlushSyncGeneratorChangesToDiskMessage,
@@ -83,13 +92,6 @@ import {
8392
SET_NX_CONSOLE_PREFERENCE_AND_INSTALL,
8493
type SetNxConsolePreferenceAndInstallResponse,
8594
} from '../message-types/nx-console';
86-
import {
87-
GET_CONFIGURE_AI_AGENTS_STATUS,
88-
RESET_CONFIGURE_AI_AGENTS_STATUS,
89-
type ConfigureAiAgentsStatusResponse,
90-
type HandleGetConfigureAiAgentsStatusMessage,
91-
type HandleResetConfigureAiAgentsStatusMessage,
92-
} from '../message-types/configure-ai-agents';
9395
import { REGISTER_PROJECT_GRAPH_LISTENER } from '../message-types/register-project-graph-listener';
9496
import {
9597
HandlePostTasksExecutionMessage,
@@ -1173,35 +1175,34 @@ export class DaemonClient {
11731175
private async waitForServerToBeAvailable(options: {
11741176
ignoreVersionMismatch: boolean;
11751177
}): Promise<boolean> {
1176-
let attempts = 0;
1177-
11781178
clientLogger.log(
11791179
`[Client] Waiting for server (max: ${WAIT_FOR_SERVER_CONFIG.maxAttempts} attempts, ${WAIT_FOR_SERVER_CONFIG.delayMs}ms interval)`
11801180
);
11811181

1182-
while (attempts < WAIT_FOR_SERVER_CONFIG.maxAttempts) {
1183-
await new Promise((resolve) =>
1184-
setTimeout(resolve, WAIT_FOR_SERVER_CONFIG.delayMs)
1185-
);
1186-
attempts++;
1187-
1188-
try {
1189-
if (await this.isServerAvailable()) {
1190-
clientLogger.log(
1191-
`[Client] Server available after ${attempts} attempts`
1192-
);
1193-
return true;
1194-
}
1195-
} catch (err) {
1196-
if (err instanceof VersionMismatchError) {
1197-
if (!options.ignoreVersionMismatch) {
1198-
throw err;
1182+
const socket = await waitForSocketConnection(
1183+
() => {
1184+
try {
1185+
return this.getSocketPath();
1186+
} catch (err) {
1187+
if (err instanceof VersionMismatchError) {
1188+
if (!options.ignoreVersionMismatch) {
1189+
throw err;
1190+
}
11991191
}
1200-
// Keep waiting - old cache file may exist
1201-
} else {
1202-
throw err;
1192+
// Socket path not available yet — keep polling
1193+
return null;
12031194
}
1195+
},
1196+
{
1197+
maxAttempts: WAIT_FOR_SERVER_CONFIG.maxAttempts,
1198+
delayMs: WAIT_FOR_SERVER_CONFIG.delayMs,
12041199
}
1200+
);
1201+
1202+
if (socket) {
1203+
socket.destroy();
1204+
clientLogger.log(`[Client] Server available`);
1205+
return true;
12051206
}
12061207

12071208
clientLogger.log(
@@ -1215,13 +1216,25 @@ export class DaemonClient {
12151216
force?: 'v8' | 'json'
12161217
): Promise<any> {
12171218
await this.startDaemonIfNecessary();
1218-
// An open promise isn't enough to keep the event loop
1219-
// alive, so we set a timeout here and clear it when we hear
1220-
// back
1221-
const keepAlive = setTimeout(() => {}, 10 * 60 * 1000);
1219+
1220+
let keepAlive: NodeJS.Timeout;
12221221
return new Promise((resolve, reject) => {
12231222
performance.mark('sendMessageToDaemon-start');
12241223

1224+
// An open promise isn't enough to keep the event loop
1225+
// alive, so we set a timeout here and clear it when we hear
1226+
// back. This **must** be longer than the message timeout used
1227+
// in the plugin isolation messages, or the daemon will timeout before
1228+
// a plugin worker would, and that can result in odd exit behavior.
1229+
keepAlive = setTimeout(
1230+
() => {
1231+
reject(
1232+
new Error('The daemon timed out while processing ' + message.type)
1233+
);
1234+
},
1235+
20 * 60 * 1000
1236+
);
1237+
12251238
this.currentMessage = message;
12261239
this.currentResolve = resolve;
12271240
this.currentReject = reject;

packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ChildProcess, spawn } from 'child_process';
2-
import { Socket, connect } from 'net';
2+
import { Socket } from 'net';
33
import { Readable, Writable } from 'stream';
44
import path = require('path');
55

@@ -9,6 +9,7 @@ import { getPluginOsSocketPath } from '../../../daemon/socket-utils';
99
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';
1010
import { getNxRequirePaths } from '../../../utils/installation-directory';
1111
import { logger } from '../../../utils/logger';
12+
import { waitForSocketConnection } from '../../../utils/wait-for-socket-connection';
1213
import type { RawProjectGraphDependency } from '../../project-graph-builder';
1314
import { LoadedNxPlugin } from '../loaded-nx-plugin';
1415
import type {
@@ -196,7 +197,12 @@ export class IsolatedPlugin implements LoadedNxPlugin {
196197

197198
if (!this._connectPromise) {
198199
logger.verbose(`[plugin-client] restarting worker for "${this.name}"`);
199-
this._connectPromise = this.spawnAndConnect();
200+
this._connectPromise = this.spawnAndConnect().catch((err) => {
201+
// Clear the cached promise so subsequent calls can retry
202+
// instead of re-awaiting a permanently-rejected promise.
203+
this._connectPromise = null;
204+
throw err;
205+
});
200206
}
201207

202208
await this._connectPromise;
@@ -552,49 +558,55 @@ async function startPluginWorker(name: string) {
552558

553559
worker.unref();
554560

555-
let attempts = 0;
556-
return new Promise<{ worker: ChildProcess; socket: Socket }>(
557-
(resolve, reject) => {
558-
const id = setInterval(async () => {
559-
const socket = await isServerAvailable(ipcPath);
560-
if (socket) {
561-
socket.unref();
562-
clearInterval(id);
563-
logger.verbose(
564-
`[isolated-plugin] connected to worker for "${name}" (pid: ${worker.pid}) after ${attempts} attempt(s)`
565-
);
566-
resolve({ worker, socket });
567-
} else if (attempts > 10000) {
568-
clearInterval(id);
569-
reject(new Error(`Failed to start plugin worker for plugin ${name}`));
570-
} else {
571-
attempts++;
572-
}
573-
}, 10);
574-
}
575-
).finally(() => {
561+
try {
562+
const socket = await connectToWorker(worker, ipcPath, name);
563+
return { worker, socket };
564+
} finally {
576565
performance.mark(`start-plugin-worker-end:${name}`);
577566
performance.measure(
578567
`start-plugin-worker:${name}`,
579568
`start-plugin-worker:${name}`,
580569
`start-plugin-worker-end:${name}`
581570
);
582-
});
571+
}
583572
}
584573

585-
function isServerAvailable(ipcPath: string): Promise<Socket | false> {
586-
return new Promise((resolve) => {
587-
try {
588-
const socket = connect(ipcPath, () => {
589-
resolve(socket);
590-
});
591-
socket.once('error', () => {
592-
resolve(false);
593-
});
594-
} catch {
595-
resolve(false);
574+
async function connectToWorker(
575+
worker: ChildProcess,
576+
ipcPath: string,
577+
name: string
578+
): Promise<Socket> {
579+
const abortController = new AbortController();
580+
let earlyExitError: Error | null = null;
581+
582+
// If the worker exits before we connect, abort polling immediately
583+
// rather than burning through attempts against a dead socket.
584+
worker.once('exit', (code) => {
585+
if (!abortController.signal.aborted) {
586+
earlyExitError = new Error(
587+
`Plugin worker for "${name}" exited with code ${code} before the connection was established.`
588+
);
589+
abortController.abort();
596590
}
597591
});
592+
593+
const socket = await waitForSocketConnection(ipcPath, {
594+
signal: abortController.signal,
595+
});
596+
597+
if (socket) {
598+
abortController.abort();
599+
socket.unref();
600+
logger.verbose(
601+
`[isolated-plugin] connected to worker for "${name}" (pid: ${worker.pid})`
602+
);
603+
return socket;
604+
}
605+
606+
if (earlyExitError) {
607+
throw earlyExitError;
608+
}
609+
throw new Error(`Failed to start plugin worker for plugin ${name}`);
598610
}
599611

600612
function getTypeName(u: unknown): string {

packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@ let plugin: LoadedNxPlugin;
3636
const socketPath = process.argv[2];
3737
const expectedPluginName = process.argv[3];
3838

39+
const CONNECT_TIMEOUT_MS = 30_000;
40+
3941
let connectErrorTimeout = setErrorTimeout(
40-
5000,
41-
`The plugin worker for ${expectedPluginName} is exiting as it was not connected to within 5 seconds. ` +
42+
CONNECT_TIMEOUT_MS,
43+
`The plugin worker for ${expectedPluginName} is exiting as it was not connected to within ${CONNECT_TIMEOUT_MS / 1000} seconds. ` +
4244
'Plugin workers expect to receive a socket connection from the parent process shortly after being started. ' +
4345
'If you are seeing this issue, please report it to the Nx team at https://github.com/nrwl/nx/issues.'
4446
);
4547

4648
const server = createServer((socket) => {
4749
connectErrorTimeout?.clear();
50+
4851
logger.verbose(
4952
`[plugin-worker] "${expectedPluginName}" (pid: ${process.pid}) connected`
5053
);
@@ -177,20 +180,13 @@ const server = createServer((socket) => {
177180
})
178181
);
179182

180-
// There should only ever be one host -> worker connection
181-
// since the worker is spawned per host process. As such,
182-
// we can safely close the worker when the host disconnects.
183+
// When the host disconnects, clean up and exit.
183184
socket.on('end', () => {
184-
// Destroys the socket once it's fully closed.
185185
socket.destroySoon();
186-
// Stops accepting new connections, but existing connections are
187-
// not closed immediately.
188-
server.close(() => {
189-
try {
190-
unlinkSync(socketPath);
191-
} catch (e) {}
192-
process.exit(0);
193-
});
186+
try {
187+
unlinkSync(socketPath);
188+
} catch (e) {}
189+
process.exit(0);
194190
});
195191
});
196192

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Socket, connect } from 'net';
2+
3+
const DEFAULT_DELAY_MS = 10;
4+
const DEFAULT_MAX_ATTEMPTS = 10_000;
5+
6+
/**
7+
* Polls an IPC socket path until a connection succeeds or the attempt
8+
* limit / abort signal is reached.
9+
*
10+
* @param socketPath - A fixed path string, or a function that resolves
11+
* the path on each attempt (useful when the server hasn't written its
12+
* socket file yet).
13+
* @returns The connected socket, or `null` if polling was exhausted or aborted.
14+
*/
15+
export async function waitForSocketConnection(
16+
socketPath: string | (() => string | null),
17+
options?: {
18+
signal?: AbortSignal;
19+
maxAttempts?: number;
20+
delayMs?: number;
21+
}
22+
): Promise<Socket | null> {
23+
const maxAttempts = options?.maxAttempts ?? DEFAULT_MAX_ATTEMPTS;
24+
const delayMs = options?.delayMs ?? DEFAULT_DELAY_MS;
25+
const signal = options?.signal;
26+
27+
let attempts = 0;
28+
while (attempts < maxAttempts) {
29+
if (signal?.aborted) return null;
30+
await new Promise((r) => setTimeout(r, delayMs));
31+
if (signal?.aborted) return null;
32+
33+
const path = typeof socketPath === 'function' ? socketPath() : socketPath;
34+
if (path) {
35+
const socket = await tryConnect(path);
36+
if (socket) {
37+
return socket;
38+
}
39+
}
40+
attempts++;
41+
}
42+
return null;
43+
}
44+
45+
function tryConnect(socketPath: string): Promise<Socket | null> {
46+
return new Promise((resolve) => {
47+
try {
48+
const socket = connect(socketPath, () => {
49+
resolve(socket);
50+
});
51+
socket.once('error', () => {
52+
socket.destroy();
53+
resolve(null);
54+
});
55+
} catch {
56+
resolve(null);
57+
}
58+
});
59+
}

0 commit comments

Comments
 (0)