Skip to content

Commit cf79f48

Browse files
fix(core): trim memory usage associated with io-tracing service (#34866)
## Current Behavior We accumulate `inputs`/`outputs`/`pids` and store them even if no subscriber is subscribed, and then they hang around to notify late subscribers that may never come ## Expected Behavior `inputs`/`outputs`/`pids` are only sent to current subscribers ## AI Summary This pull request optimizes how task input notifications are handled in Nx by ensuring that expensive input collection and storage only occur when there are active subscribers. This change prevents unnecessary memory growth in long-lived processes, such as the Nx daemon, and improves performance by avoiding redundant work. Notification and input collection optimization: * Added a `hasTaskInputSubscribers()` method to the `TaskIOService` class, allowing the hasher to check if any input subscribers are registered before collecting and notifying task inputs. * Updated all hashing functions in `hash-task.ts` to only notify task inputs if there are active subscribers, reducing unnecessary work and memory usage. [[1]](diffhunk://#diff-d061dc5551f692abad009b8284c719466cff2f0d3d19bd52e81b7921a9e543d6R57-R63) [[2]](diffhunk://#diff-d061dc5551f692abad009b8284c719466cff2f0d3d19bd52e81b7921a9e543d6L114-R117) [[3]](diffhunk://#diff-d061dc5551f692abad009b8284c719466cff2f0d3d19bd52e81b7921a9e543d6R172) [[4]](diffhunk://#diff-d061dc5551f692abad009b8284c719466cff2f0d3d19bd52e81b7921a9e543d6L185-R188) [[5]](diffhunk://#diff-d061dc5551f692abad009b8284c719466cff2f0d3d19bd52e81b7921a9e543d6L201-R204) * Modified the native task hasher implementation (`native-task-hasher-impl.ts` and Rust code) to conditionally collect and return input data only when requested, minimizing overhead and memory allocation. [[1]](diffhunk://#diff-ddf992f97afbcd8b2206b8d1faab7e7f9571ecfe50e2ab8e3da104001f39f0c3L73-R80) [[2]](diffhunk://#diff-ddf992f97afbcd8b2206b8d1faab7e7f9571ecfe50e2ab8e3da104001f39f0c3L88-R101) [[3]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bR194-R200) [[4]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL226-R233) [[5]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bR256) [[6]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL262-R273) [[7]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL288-R299) [[8]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bR330-R352) [[9]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL348-R383) [[10]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL368-R407) [[11]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bR432) [[12]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL417-R475) [[13]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL452-R489) [[14]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bL461-R503) [[15]](diffhunk://#diff-d81ca7875513ef544822c24671104ce52cd09409a41a796768aba507d87b3c0bR518) Code cleanup and refactoring: * Removed unused state and redundant code from `TaskIOService` related to storing task-to-PID, task-to-input, and task-to-output mappings, as well as unnecessary graph references and late subscriber emission logic. [[1]](diffhunk://#diff-fc051b28c1ac25926033d157ec2278cc1cf2fcf7e16626eeedaf876c29fc9d62R42-R66) [[2]](diffhunk://#diff-fc051b28c1ac25926033d157ec2278cc1cf2fcf7e16626eeedaf876c29fc9d62L91-L95) [[3]](diffhunk://#diff-fc051b28c1ac25926033d157ec2278cc1cf2fcf7e16626eeedaf876c29fc9d62L104-L111) * Updated imports and cleaned up constructor logic in `task-io-service.ts` and `native-task-hasher-impl.ts` for clarity and maintainability. [[1]](diffhunk://#diff-ddf992f97afbcd8b2206b8d1faab7e7f9571ecfe50e2ab8e3da104001f39f0c3R16) [[2]](diffhunk://#diff-fc051b28c1ac25926033d157ec2278cc1cf2fcf7e16626eeedaf876c29fc9d62L1-L2) These changes collectively make task input tracking more efficient and robust, especially in scenarios where Nx runs as a daemon or in environments with no listeners for input notifications. --------- Co-authored-by: nx-cloud[bot] <71083854+nx-cloud[bot]@users.noreply.github.com> Co-authored-by: AgentEnder <[email protected]>
1 parent 61a8e33 commit cf79f48

6 files changed

Lines changed: 133 additions & 106 deletions

File tree

packages/nx/src/hasher/hash-task.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ export async function hashTasksThatDoNotDependOnOutputsOfOtherTasks(
5454

5555
const hashes = await hasher.hashTasks(tasksToHash, taskGraph, process.env);
5656
const ioService = getTaskIOService();
57+
const hasInputSubscribers = ioService.hasTaskInputSubscribers();
5758
for (let i = 0; i < tasksToHash.length; i++) {
5859
tasksToHash[i].hash = hashes[i].value;
5960
tasksToHash[i].hashDetails = hashes[i].details;
6061

6162
// Notify TaskIOService of hash inputs
62-
if (hashes[i].inputs) {
63+
if (hasInputSubscribers && hashes[i].inputs) {
6364
ioService.notifyTaskInputs(tasksToHash[i].id, hashes[i].inputs);
6465
}
6566
}
@@ -111,8 +112,9 @@ export async function hashTask(
111112
task.hashDetails = details;
112113

113114
// Notify TaskIOService of hash inputs
114-
if (inputs) {
115-
getTaskIOService().notifyTaskInputs(task.id, inputs);
115+
const ioService = getTaskIOService();
116+
if (ioService.hasTaskInputSubscribers() && inputs) {
117+
ioService.notifyTaskInputs(task.id, inputs);
116118
}
117119

118120
if (taskDetails?.recordTaskDetails) {
@@ -167,6 +169,7 @@ export async function hashTasks(
167169

168170
// Hash tasks with custom hashers individually
169171
const ioService = getTaskIOService();
172+
const hasInputSubscribers = ioService.hasTaskInputSubscribers();
170173
const customHasherPromises = tasksWithCustomHashers.map(async (task) => {
171174
const customHasher = getCustomHasher(task, projectGraph);
172175
const { value, details, inputs } = await customHasher(task, {
@@ -182,7 +185,7 @@ export async function hashTasks(
182185
task.hashDetails = details;
183186

184187
// Notify TaskIOService of hash inputs
185-
if (inputs) {
188+
if (hasInputSubscribers && inputs) {
186189
ioService.notifyTaskInputs(task.id, inputs);
187190
}
188191
});
@@ -198,7 +201,7 @@ export async function hashTasks(
198201
tasksWithoutCustomHashers[i].hashDetails = hashes[i].details;
199202

200203
// Notify TaskIOService of hash inputs
201-
if (hashes[i].inputs) {
204+
if (hasInputSubscribers && hashes[i].inputs) {
202205
ioService.notifyTaskInputs(
203206
tasksWithoutCustomHashers[i].id,
204207
hashes[i].inputs

packages/nx/src/hasher/native-task-hasher-impl.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { NxJsonConfiguration } from '../config/nx-json';
44
import { createTaskGraph } from '../tasks-runner/create-task-graph';
55
import { NativeTaskHasherImpl } from './native-task-hasher-impl';
66
import { ProjectGraphBuilder } from '../project-graph/project-graph-builder';
7+
import { getTaskIOService } from '../tasks-runner/task-io-service';
78

89
// Helper to normalize hash results for deterministic snapshot comparison
910
// (parallel processing may produce inputs in arbitrary order)
@@ -55,6 +56,10 @@ describe('native task hasher', () => {
5556
};
5657

5758
beforeEach(async () => {
59+
// Register a subscriber so that hasTaskInputSubscribers() returns true,
60+
// enabling input collection in the hasher during tests.
61+
getTaskIOService().subscribeToTaskInputs(() => {});
62+
5863
tempFs = new TempFs('NativeTaskHasher');
5964
await tempFs.createFiles({
6065
'libs/parent/src/index.ts': 'parent-content',

packages/nx/src/hasher/native-task-hasher-impl.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
} from '../native';
1414
import { transformProjectGraphForRust } from '../native/transform-objects';
1515
import { getRootTsConfigPath } from '../plugins/js/utils/typescript';
16+
import { getTaskIOService } from '../tasks-runner/task-io-service';
1617
import { readJsonFile } from '../utils/fileutils';
1718
import { PartialHash, TaskHasherImpl } from './task-hasher';
1819

@@ -70,7 +71,13 @@ export class NativeTaskHasherImpl implements TaskHasherImpl {
7071
cwd?: string
7172
): Promise<PartialHash> {
7273
const plans = this.planner.getPlansReference([task.id], taskGraph);
73-
const hashes = this.hasher.hashPlans(plans, env, cwd ?? process.cwd());
74+
const collectInputs = getTaskIOService().hasTaskInputSubscribers();
75+
const hashes = this.hasher.hashPlans(
76+
plans,
77+
env,
78+
cwd ?? process.cwd(),
79+
collectInputs
80+
);
7481

7582
return hashes[task.id];
7683
}
@@ -85,7 +92,13 @@ export class NativeTaskHasherImpl implements TaskHasherImpl {
8592
tasks.map((t) => t.id),
8693
taskGraph
8794
);
88-
const hashes = this.hasher.hashPlans(plans, env, cwd ?? process.cwd());
95+
const collectInputs = getTaskIOService().hasTaskInputSubscribers();
96+
const hashes = this.hasher.hashPlans(
97+
plans,
98+
env,
99+
cwd ?? process.cwd(),
100+
collectInputs
101+
);
89102
return tasks.map((t) => hashes[t.id]);
90103
}
91104
}

packages/nx/src/native/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export declare class TaskDetails {
168168

169169
export declare class TaskHasher {
170170
constructor(workspaceRoot: string, projectGraph: ExternalObject<ProjectGraph>, projectFileMap: ExternalObject<ProjectFiles>, allWorkspaceFiles: ExternalObject<Array<FileData>>, tsConfig: Buffer, tsConfigPaths: Record<string, Array<string>>, rootTsconfigPath?: string | undefined | null, options?: HasherOptions | undefined | null)
171-
hashPlans(hashPlans: ExternalObject<Record<string, Array<HashInstruction>>>, jsEnv: Record<string, string>, cwd: string): NapiDashMap<string, HashDetails>
171+
hashPlans(hashPlans: ExternalObject<Record<string, Array<HashInstruction>>>, jsEnv: Record<string, string>, cwd: string, collectTaskInputs?: boolean | undefined | null): NapiDashMap<string, HashDetails>
172172
}
173173

174174
export declare class Watcher {

packages/nx/src/native/tasks/task_hasher.rs

Lines changed: 91 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,13 @@ impl TaskHasher {
191191
hash_plans: &External<HashMap<String, Vec<HashInstruction>>>,
192192
js_env: HashMap<String, String>,
193193
cwd: String,
194+
collect_task_inputs: Option<bool>,
194195
) -> anyhow::Result<NapiDashMap<String, HashDetails>> {
195196
// Create a fresh task output cache for this invocation
196197
// This ensures no stale caches across multiple CLI commands when the daemon holds
197198
// the TaskHasher instance
198199
let task_output_cache = DashMap::new();
200+
let should_collect_inputs = collect_task_inputs.unwrap_or(false);
199201

200202
let function_start = std::time::Instant::now();
201203

@@ -223,7 +225,12 @@ impl TaskHasher {
223225

224226
// Use separate maps: one for hash details, one for input accumulation with HashSet
225227
let hashes: NapiDashMap<String, HashDetails> = NapiDashMap::new();
226-
let inputs_accum: DashMap<String, HashInputsBuilder> = DashMap::new();
228+
// Only allocate inputs accumulator when someone is listening for inputs
229+
let inputs_accum: Option<DashMap<String, HashInputsBuilder>> = if should_collect_inputs {
230+
Some(DashMap::new())
231+
} else {
232+
None
233+
};
227234
let cwd_path = std::path::Path::new(&cwd);
228235

229236
hash_plans
@@ -246,6 +253,7 @@ impl TaskHasher {
246253
selectively_hash_tsconfig,
247254
task_output_cache: &task_output_cache,
248255
cwd: cwd_path,
256+
collect_inputs: should_collect_inputs,
249257
},
250258
)?;
251259

@@ -259,11 +267,10 @@ impl TaskHasher {
259267
});
260268
entry.details.insert(instruction_key, hash_value);
261269

262-
// Accumulate inputs using HashSet for O(1) deduplication
263-
inputs_accum
264-
.entry(task_id.to_string())
265-
.or_default()
266-
.extend(inputs);
270+
// Accumulate inputs using HashSet for O(1) deduplication (only when collecting)
271+
if let Some(ref accum) = inputs_accum {
272+
accum.entry(task_id.to_string()).or_default().extend(inputs);
273+
}
267274

268275
Ok::<(), anyhow::Error>(())
269276
})?;
@@ -285,8 +292,10 @@ impl TaskHasher {
285292
hash_details.value = hash;
286293
});
287294
// Convert accumulated HashInputsBuilder to HashInputs (sorted Vecs)
288-
if let Some((_, builder)) = inputs_accum.remove(hash_id) {
289-
hash_details.inputs = builder.into();
295+
if let Some(ref accum) = inputs_accum {
296+
if let Some((_, builder)) = accum.remove(hash_id) {
297+
hash_details.inputs = builder.into();
298+
}
290299
}
291300
});
292301

@@ -318,24 +327,29 @@ impl TaskHasher {
318327
selectively_hash_tsconfig,
319328
task_output_cache,
320329
cwd,
330+
collect_inputs,
321331
}: HashInstructionArgs,
322332
) -> anyhow::Result<(String, String, HashInputsBuilder)> {
323333
let now = std::time::Instant::now();
324334
let span = trace_span!("hashing", task_id).entered();
335+
let empty = HashInputsBuilder::default();
325336
let (hash, inputs) = match instruction {
326337
HashInstruction::WorkspaceFileSet(workspace_file_set) => {
327338
let result = hash_workspace_files_with_inputs(
328339
workspace_file_set,
329340
&self.all_workspace_files,
330341
)?;
331342
trace!(parent: &span, "hash_workspace_files: {:?}", now.elapsed());
332-
(
333-
result.hash,
343+
let inputs = if collect_inputs {
334344
HashInputsBuilder {
335345
files: result.files.into_iter().collect(),
336346
..Default::default()
337-
},
338-
)
347+
}
348+
} else {
349+
drop(result.files);
350+
empty
351+
};
352+
(result.hash, inputs)
339353
}
340354
HashInstruction::Runtime(runtime) => {
341355
let hashed_runtime = hash_runtime(
@@ -345,18 +359,28 @@ impl TaskHasher {
345359
Arc::clone(&self.runtime_cache),
346360
)?;
347361
trace!(parent: &span, "hash_runtime: {:?}", now.elapsed());
348-
(hashed_runtime, instruction.into())
362+
let inputs = if collect_inputs {
363+
instruction.into()
364+
} else {
365+
empty
366+
};
367+
(hashed_runtime, inputs)
349368
}
350369
HashInstruction::Environment(env) => {
351370
let hashed_env = hash_env(env, js_env);
352371
trace!(parent: &span, "hash_env: {:?}", now.elapsed());
353-
(hashed_env, instruction.into())
372+
let inputs = if collect_inputs {
373+
instruction.into()
374+
} else {
375+
empty
376+
};
377+
(hashed_env, inputs)
354378
}
355379
HashInstruction::Cwd(mode) => {
356380
let workspace_root = std::path::Path::new(&self.workspace_root);
357381
let hashed_cwd = hash_cwd(workspace_root, cwd, mode.clone());
358382
trace!(parent: &span, "hash_cwd: {:?}", now.elapsed());
359-
(hashed_cwd, instruction.into())
383+
(hashed_cwd, empty)
360384
}
361385
HashInstruction::ProjectFileSet(project_name, file_sets) => {
362386
let result = hash_project_files_with_inputs(
@@ -365,19 +389,22 @@ impl TaskHasher {
365389
&self.project_file_map,
366390
)?;
367391
trace!(parent: &span, "hash_project_files: {:?}", now.elapsed());
368-
(
369-
result.hash,
392+
let inputs = if collect_inputs {
370393
HashInputsBuilder {
371394
files: result.files.into_iter().collect(),
372395
..Default::default()
373-
},
374-
)
396+
}
397+
} else {
398+
drop(result.files);
399+
empty
400+
};
401+
(result.hash, inputs)
375402
}
376403
HashInstruction::ProjectConfiguration(project_name) => {
377404
let hashed_project_config =
378405
hash_project_config(project_name, &self.project_graph.nodes)?;
379406
trace!(parent: &span, "hash_project_config: {:?}", now.elapsed());
380-
(hashed_project_config, instruction.into())
407+
(hashed_project_config, empty)
381408
}
382409
HashInstruction::TsConfiguration(project_name) => {
383410
let ts_config_hash = if !selectively_hash_tsconfig {
@@ -402,45 +429,50 @@ impl TaskHasher {
402429
// the unwrap_or is for the case where typescript is not installed
403430
.unwrap_or(ts_config_hash);
404431

405-
let relative_ts_path = if let Some(root_path) = &self.root_tsconfig_path {
406-
Some(
407-
Path::new(root_path)
408-
.strip_prefix(&self.workspace_root)
409-
.unwrap_or(Path::new(root_path))
410-
.to_string_lossy()
411-
.to_string(),
412-
)
413-
} else {
414-
None
415-
};
432+
let inputs = if collect_inputs {
433+
let relative_ts_path = if let Some(root_path) = &self.root_tsconfig_path {
434+
Some(
435+
Path::new(root_path)
436+
.strip_prefix(&self.workspace_root)
437+
.unwrap_or(Path::new(root_path))
438+
.to_string_lossy()
439+
.to_string(),
440+
)
441+
} else {
442+
None
443+
};
444+
445+
let files = if let Some(rel_path) = relative_ts_path {
446+
HashSet::from([rel_path])
447+
} else {
448+
HashSet::new()
449+
};
416450

417-
// Convert absolute tsconfig path to relative path
418-
let files = if let Some(rel_path) = relative_ts_path {
419-
HashSet::from([rel_path])
451+
HashInputsBuilder {
452+
files,
453+
..Default::default()
454+
}
420455
} else {
421-
HashSet::new()
456+
empty
422457
};
423458

424459
trace!(parent: &span, "hash_tsconfig: {:?}", now.elapsed());
425-
(
426-
ts_hash,
427-
HashInputsBuilder {
428-
files,
429-
..Default::default()
430-
},
431-
)
460+
(ts_hash, inputs)
432461
}
433462
HashInstruction::TaskOutput(glob, outputs) => {
434463
let result =
435464
hash_task_output(&self.workspace_root, glob, outputs, task_output_cache)?;
436465
trace!(parent: &span, "hash_task_output: {:?}", now.elapsed());
437-
(
438-
result.hash,
466+
let inputs = if collect_inputs {
439467
HashInputsBuilder {
440468
dep_outputs: result.files.into_iter().collect(),
441469
..Default::default()
442-
},
443-
)
470+
}
471+
} else {
472+
drop(result.files);
473+
empty
474+
};
475+
(result.hash, inputs)
444476
}
445477
HashInstruction::External(external) => {
446478
let hashed_external = hash_external(
@@ -449,7 +481,12 @@ impl TaskHasher {
449481
Arc::clone(&self.external_cache),
450482
)?;
451483
trace!(parent: &span, "hash_external: {:?}", now.elapsed());
452-
(hashed_external, instruction.into())
484+
let inputs = if collect_inputs {
485+
instruction.into()
486+
} else {
487+
empty
488+
};
489+
(hashed_external, inputs)
453490
}
454491
HashInstruction::AllExternalDependencies => {
455492
let hashed_all_externals = hash_all_externals(
@@ -458,7 +495,12 @@ impl TaskHasher {
458495
Arc::clone(&self.external_cache),
459496
)?;
460497
trace!(parent: &span, "hash_all_externals: {:?}", now.elapsed());
461-
(hashed_all_externals, instruction.into())
498+
let inputs = if collect_inputs {
499+
instruction.into()
500+
} else {
501+
empty
502+
};
503+
(hashed_all_externals, inputs)
462504
}
463505
};
464506
Ok((instruction.to_string(), hash, inputs))
@@ -473,4 +515,5 @@ struct HashInstructionArgs<'a> {
473515
selectively_hash_tsconfig: bool,
474516
task_output_cache: &'a DashMap<String, CachedTaskOutput>,
475517
cwd: &'a std::path::Path,
518+
collect_inputs: bool,
476519
}

0 commit comments

Comments
 (0)