Skip to content

Commit 3a264f4

Browse files
committed
Revert "fix(core): prevent phantom connections and dead polling in plugin workers (#34823)"
This reverts commit 1a8438f.
1 parent 6e96dab commit 3a264f4

4 files changed

Lines changed: 82 additions & 162 deletions

File tree

packages/nx/src/daemon/client/client.ts

Lines changed: 33 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ import {
2828
import { preventRecursionInGraphConstruction } from '../../project-graph/project-graph';
2929
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration/source-maps';
3030
import { isJsonMessage } from '../../utils/consume-messages-from-socket';
31-
import { waitForSocketConnection } from '../../utils/wait-for-socket-connection';
3231
import { DelayedSpinner } from '../../utils/delayed-spinner';
33-
import { handleImport } from '../../utils/handle-import';
3432
import { isCI } from '../../utils/is-ci';
3533
import { isSandbox } from '../../utils/is-sandbox';
3634
import { output } from '../../utils/output';
@@ -43,13 +41,6 @@ import { workspaceRoot } from '../../utils/workspace-root';
4341
import { getDaemonProcessIdSync, readDaemonProcessJsonCache } from '../cache';
4442
import { isNxVersionMismatch } from '../is-nx-version-mismatch';
4543
import { clientLogger } from '../logger';
46-
import {
47-
type ConfigureAiAgentsStatusResponse,
48-
GET_CONFIGURE_AI_AGENTS_STATUS,
49-
type HandleGetConfigureAiAgentsStatusMessage,
50-
type HandleResetConfigureAiAgentsStatusMessage,
51-
RESET_CONFIGURE_AI_AGENTS_STATUS,
52-
} from '../message-types/configure-ai-agents';
5344
import {
5445
FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK,
5546
type HandleFlushSyncGeneratorChangesToDiskMessage,
@@ -92,6 +83,13 @@ import {
9283
SET_NX_CONSOLE_PREFERENCE_AND_INSTALL,
9384
type SetNxConsolePreferenceAndInstallResponse,
9485
} from '../message-types/nx-console';
86+
import {
87+
GET_CONFIGURE_AI_AGENTS_STATUS,
88+
RESET_CONFIGURE_AI_AGENTS_STATUS,
89+
type ConfigureAiAgentsStatusResponse,
90+
type HandleGetConfigureAiAgentsStatusMessage,
91+
type HandleResetConfigureAiAgentsStatusMessage,
92+
} from '../message-types/configure-ai-agents';
9593
import { REGISTER_PROJECT_GRAPH_LISTENER } from '../message-types/register-project-graph-listener';
9694
import {
9795
HandlePostTasksExecutionMessage,
@@ -1175,34 +1173,35 @@ export class DaemonClient {
11751173
private async waitForServerToBeAvailable(options: {
11761174
ignoreVersionMismatch: boolean;
11771175
}): Promise<boolean> {
1176+
let attempts = 0;
1177+
11781178
clientLogger.log(
11791179
`[Client] Waiting for server (max: ${WAIT_FOR_SERVER_CONFIG.maxAttempts} attempts, ${WAIT_FOR_SERVER_CONFIG.delayMs}ms interval)`
11801180
);
11811181

1182-
const socket = await waitForSocketConnection(
1183-
() => {
1184-
try {
1185-
return this.getSocketPath();
1186-
} catch (err) {
1187-
if (err instanceof VersionMismatchError) {
1188-
if (!options.ignoreVersionMismatch) {
1189-
throw err;
1190-
}
1182+
while (attempts < WAIT_FOR_SERVER_CONFIG.maxAttempts) {
1183+
await new Promise((resolve) =>
1184+
setTimeout(resolve, WAIT_FOR_SERVER_CONFIG.delayMs)
1185+
);
1186+
attempts++;
1187+
1188+
try {
1189+
if (await this.isServerAvailable()) {
1190+
clientLogger.log(
1191+
`[Client] Server available after ${attempts} attempts`
1192+
);
1193+
return true;
1194+
}
1195+
} catch (err) {
1196+
if (err instanceof VersionMismatchError) {
1197+
if (!options.ignoreVersionMismatch) {
1198+
throw err;
11911199
}
1192-
// Socket path not available yet — keep polling
1193-
return null;
1200+
// Keep waiting - old cache file may exist
1201+
} else {
1202+
throw err;
11941203
}
1195-
},
1196-
{
1197-
maxAttempts: WAIT_FOR_SERVER_CONFIG.maxAttempts,
1198-
delayMs: WAIT_FOR_SERVER_CONFIG.delayMs,
11991204
}
1200-
);
1201-
1202-
if (socket) {
1203-
socket.destroy();
1204-
clientLogger.log(`[Client] Server available`);
1205-
return true;
12061205
}
12071206

12081207
clientLogger.log(
@@ -1216,25 +1215,13 @@ export class DaemonClient {
12161215
force?: 'v8' | 'json'
12171216
): Promise<any> {
12181217
await this.startDaemonIfNecessary();
1219-
1220-
let keepAlive: NodeJS.Timeout;
1218+
// An open promise isn't enough to keep the event loop
1219+
// alive, so we set a timeout here and clear it when we hear
1220+
// back
1221+
const keepAlive = setTimeout(() => {}, 10 * 60 * 1000);
12211222
return new Promise((resolve, reject) => {
12221223
performance.mark('sendMessageToDaemon-start');
12231224

1224-
// An open promise isn't enough to keep the event loop
1225-
// alive, so we set a timeout here and clear it when we hear
1226-
// back. This **must** be longer than the message timeout used
1227-
// in the plugin isolation messages, or the daemon will timeout before
1228-
// a plugin worker would, and that can result in odd exit behavior.
1229-
keepAlive = setTimeout(
1230-
() => {
1231-
reject(
1232-
new Error('The daemon timed out while processing ' + message.type)
1233-
);
1234-
},
1235-
20 * 60 * 1000
1236-
);
1237-
12381225
this.currentMessage = message;
12391226
this.currentResolve = resolve;
12401227
this.currentReject = reject;

packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ChildProcess, spawn } from 'child_process';
2-
import { Socket } from 'net';
2+
import { Socket, connect } from 'net';
33
import { Readable, Writable } from 'stream';
44
import path = require('path');
55

@@ -9,7 +9,6 @@ import { getPluginOsSocketPath } from '../../../daemon/socket-utils';
99
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';
1010
import { getNxRequirePaths } from '../../../utils/installation-directory';
1111
import { logger } from '../../../utils/logger';
12-
import { waitForSocketConnection } from '../../../utils/wait-for-socket-connection';
1312
import type { RawProjectGraphDependency } from '../../project-graph-builder';
1413
import { LoadedNxPlugin } from '../loaded-nx-plugin';
1514
import type {
@@ -197,12 +196,7 @@ export class IsolatedPlugin implements LoadedNxPlugin {
197196

198197
if (!this._connectPromise) {
199198
logger.verbose(`[plugin-client] restarting worker for "${this.name}"`);
200-
this._connectPromise = this.spawnAndConnect().catch((err) => {
201-
// Clear the cached promise so subsequent calls can retry
202-
// instead of re-awaiting a permanently-rejected promise.
203-
this._connectPromise = null;
204-
throw err;
205-
});
199+
this._connectPromise = this.spawnAndConnect();
206200
}
207201

208202
await this._connectPromise;
@@ -558,55 +552,49 @@ async function startPluginWorker(name: string) {
558552

559553
worker.unref();
560554

561-
try {
562-
const socket = await connectToWorker(worker, ipcPath, name);
563-
return { worker, socket };
564-
} finally {
555+
let attempts = 0;
556+
return new Promise<{ worker: ChildProcess; socket: Socket }>(
557+
(resolve, reject) => {
558+
const id = setInterval(async () => {
559+
const socket = await isServerAvailable(ipcPath);
560+
if (socket) {
561+
socket.unref();
562+
clearInterval(id);
563+
logger.verbose(
564+
`[isolated-plugin] connected to worker for "${name}" (pid: ${worker.pid}) after ${attempts} attempt(s)`
565+
);
566+
resolve({ worker, socket });
567+
} else if (attempts > 10000) {
568+
clearInterval(id);
569+
reject(new Error(`Failed to start plugin worker for plugin ${name}`));
570+
} else {
571+
attempts++;
572+
}
573+
}, 10);
574+
}
575+
).finally(() => {
565576
performance.mark(`start-plugin-worker-end:${name}`);
566577
performance.measure(
567578
`start-plugin-worker:${name}`,
568579
`start-plugin-worker:${name}`,
569580
`start-plugin-worker-end:${name}`
570581
);
571-
}
582+
});
572583
}
573584

574-
async function connectToWorker(
575-
worker: ChildProcess,
576-
ipcPath: string,
577-
name: string
578-
): Promise<Socket> {
579-
const abortController = new AbortController();
580-
let earlyExitError: Error | null = null;
581-
582-
// If the worker exits before we connect, abort polling immediately
583-
// rather than burning through attempts against a dead socket.
584-
worker.once('exit', (code) => {
585-
if (!abortController.signal.aborted) {
586-
earlyExitError = new Error(
587-
`Plugin worker for "${name}" exited with code ${code} before the connection was established.`
588-
);
589-
abortController.abort();
585+
function isServerAvailable(ipcPath: string): Promise<Socket | false> {
586+
return new Promise((resolve) => {
587+
try {
588+
const socket = connect(ipcPath, () => {
589+
resolve(socket);
590+
});
591+
socket.once('error', () => {
592+
resolve(false);
593+
});
594+
} catch {
595+
resolve(false);
590596
}
591597
});
592-
593-
const socket = await waitForSocketConnection(ipcPath, {
594-
signal: abortController.signal,
595-
});
596-
597-
if (socket) {
598-
abortController.abort();
599-
socket.unref();
600-
logger.verbose(
601-
`[isolated-plugin] connected to worker for "${name}" (pid: ${worker.pid})`
602-
);
603-
return socket;
604-
}
605-
606-
if (earlyExitError) {
607-
throw earlyExitError;
608-
}
609-
throw new Error(`Failed to start plugin worker for plugin ${name}`);
610598
}
611599

612600
function getTypeName(u: unknown): string {

packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,15 @@ let plugin: LoadedNxPlugin;
3636
const socketPath = process.argv[2];
3737
const expectedPluginName = process.argv[3];
3838

39-
const CONNECT_TIMEOUT_MS = 30_000;
40-
4139
let connectErrorTimeout = setErrorTimeout(
42-
CONNECT_TIMEOUT_MS,
43-
`The plugin worker for ${expectedPluginName} is exiting as it was not connected to within ${CONNECT_TIMEOUT_MS / 1000} seconds. ` +
40+
5000,
41+
`The plugin worker for ${expectedPluginName} is exiting as it was not connected to within 5 seconds. ` +
4442
'Plugin workers expect to receive a socket connection from the parent process shortly after being started. ' +
4543
'If you are seeing this issue, please report it to the Nx team at https://github.com/nrwl/nx/issues.'
4644
);
4745

4846
const server = createServer((socket) => {
4947
connectErrorTimeout?.clear();
50-
5148
logger.verbose(
5249
`[plugin-worker] "${expectedPluginName}" (pid: ${process.pid}) connected`
5350
);
@@ -180,13 +177,20 @@ const server = createServer((socket) => {
180177
})
181178
);
182179

183-
// When the host disconnects, clean up and exit.
180+
// There should only ever be one host -> worker connection
181+
// since the worker is spawned per host process. As such,
182+
// we can safely close the worker when the host disconnects.
184183
socket.on('end', () => {
184+
// Destroys the socket once it's fully closed.
185185
socket.destroySoon();
186-
try {
187-
unlinkSync(socketPath);
188-
} catch (e) {}
189-
process.exit(0);
186+
// Stops accepting new connections, but existing connections are
187+
// not closed immediately.
188+
server.close(() => {
189+
try {
190+
unlinkSync(socketPath);
191+
} catch (e) {}
192+
process.exit(0);
193+
});
190194
});
191195
});
192196

packages/nx/src/utils/wait-for-socket-connection.ts

Lines changed: 0 additions & 59 deletions
This file was deleted.

0 commit comments

Comments
 (0)