Skip to content

Commit 8d6406d

Browse files
authored
perf: persistent cache recovery use consumer mode (#9019)
1 parent 2cb67d1 commit 8d6406d

5 files changed

Lines changed: 65 additions & 73 deletions

File tree

crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
cache::persistent::cacheable_context::CacheableContext, AsyncDependenciesBlock,
1616
AsyncDependenciesBlockIdentifier, BoxDependency, BoxModule, BuildDependency, DependencyParents,
1717
ExportInfoData, ExportsInfoData, ModuleGraph, ModuleGraphConnection, ModuleGraphModule,
18-
ModuleGraphPartial,
18+
ModuleGraphPartial, RayonConsumer,
1919
};
2020

2121
const SCOPE: &str = "occasion_make_module_graph";
@@ -132,45 +132,45 @@ pub async fn recovery_module_graph(
132132
let mut need_check_dep = vec![];
133133
let mut partial = ModuleGraphPartial::default();
134134
let mut mg = ModuleGraph::new(vec![], Some(&mut partial));
135-
let nodes: Vec<_> = storage
135+
storage
136136
.load(SCOPE)
137137
.await?
138138
.into_par_iter()
139139
.map(|(_, v)| {
140140
from_bytes::<Node, CacheableContext>(&v, context)
141141
.expect("unexpected module graph deserialize failed")
142142
})
143-
.collect();
144-
for mut node in nodes {
145-
for (dep, parent_block) in node.dependencies {
146-
mg.set_parents(
147-
*dep.id(),
148-
DependencyParents {
149-
block: parent_block,
150-
module: node.module.identifier(),
151-
},
152-
);
153-
mg.add_dependency(dep);
154-
}
155-
for con in node.connections {
156-
need_check_dep.push((con.dependency_id, *con.module_identifier()));
157-
mg.cache_recovery_connection(con);
158-
}
159-
for block in node.blocks {
160-
mg.add_block(Box::new(block));
161-
}
162-
// recovery exports/export info
163-
let other_exports_info = ExportInfoData::new(None, None);
164-
let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None);
165-
let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id());
166-
node.mgm.exports = exports_info.id();
167-
mg.set_exports_info(exports_info.id(), exports_info);
168-
mg.set_export_info(side_effects_only_info.id(), side_effects_only_info);
169-
mg.set_export_info(other_exports_info.id(), other_exports_info);
143+
.with_max_len(1)
144+
.consume(|mut node| {
145+
for (dep, parent_block) in node.dependencies {
146+
mg.set_parents(
147+
*dep.id(),
148+
DependencyParents {
149+
block: parent_block,
150+
module: node.module.identifier(),
151+
},
152+
);
153+
mg.add_dependency(dep);
154+
}
155+
for con in node.connections {
156+
need_check_dep.push((con.dependency_id, *con.module_identifier()));
157+
mg.cache_recovery_connection(con);
158+
}
159+
for block in node.blocks {
160+
mg.add_block(Box::new(block));
161+
}
162+
// recovery exports/export info
163+
let other_exports_info = ExportInfoData::new(None, None);
164+
let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None);
165+
let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id());
166+
node.mgm.exports = exports_info.id();
167+
mg.set_exports_info(exports_info.id(), exports_info);
168+
mg.set_export_info(side_effects_only_info.id(), side_effects_only_info);
169+
mg.set_export_info(other_exports_info.id(), other_exports_info);
170170

171-
mg.add_module_graph_module(node.mgm);
172-
mg.add_module(node.module);
173-
}
171+
mg.add_module_graph_module(node.mgm);
172+
mg.add_module(node.module);
173+
});
174174
// recovery incoming connections
175175
for (con_id, module_identifier) in &need_check_dep {
176176
if let Some(mgm) = mg.module_graph_module_by_identifier_mut(module_identifier) {

crates/rspack_core/src/cache/persistent/snapshot/mod.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use rustc_hash::FxHashSet as HashSet;
1313
pub use self::option::{PathMatcher, SnapshotOptions};
1414
use self::strategy::{Strategy, StrategyHelper, ValidateResult};
1515
use super::storage::Storage;
16+
use crate::FutureConsumer;
1617

1718
const SCOPE: &str = "snapshot";
1819

@@ -87,35 +88,36 @@ impl Snapshot {
8788

8889
#[tracing::instrument("Cache::Snapshot::calc_modified_path", skip_all)]
8990
pub async fn calc_modified_paths(&self) -> Result<(HashSet<ArcPath>, HashSet<ArcPath>)> {
90-
let helper = StrategyHelper::new(self.fs.clone());
91+
let mut modified_path = HashSet::default();
92+
let mut deleted_path = HashSet::default();
93+
let helper = Arc::new(StrategyHelper::new(self.fs.clone()));
9194

92-
let results = self
95+
self
9396
.storage
9497
.load(SCOPE)
9598
.await?
96-
.iter()
97-
.map(|(key, value)| async {
98-
let path: ArcPath = Path::new(&*String::from_utf8_lossy(key)).into();
99-
let strategy: Strategy =
100-
from_bytes::<Strategy, ()>(value, &()).expect("should from bytes success");
101-
let validate = helper.validate(&path, &strategy).await;
102-
(path, validate)
99+
.into_iter()
100+
.map(|(key, value)| {
101+
let helper = helper.clone();
102+
async move {
103+
let path: ArcPath = Path::new(&*String::from_utf8_lossy(&key)).into();
104+
let strategy: Strategy =
105+
from_bytes::<Strategy, ()>(&value, &()).expect("should from bytes success");
106+
let validate = helper.validate(&path, &strategy).await;
107+
(path, validate)
108+
}
103109
})
104-
.collect::<FuturesResults<_>>();
105-
106-
let mut modified_path = HashSet::default();
107-
let mut deleted_path = HashSet::default();
108-
for (path, validate) in results.into_inner() {
109-
match validate {
110+
.fut_consume(|(path, validate)| match validate {
110111
ValidateResult::Modified => {
111112
modified_path.insert(path);
112113
}
113114
ValidateResult::Deleted => {
114115
deleted_path.insert(path);
115116
}
116117
ValidateResult::NoChanged => {}
117-
}
118-
}
118+
})
119+
.await;
120+
119121
Ok((modified_path, deleted_path))
120122
}
121123
}

crates/rspack_core/src/utils/iterator_consumer/future.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ pub trait FutureConsumer {
1010
/// Use to immediately consume the data produced by the future in the iterator
1111
/// without waiting for all the data to be processed.
1212
/// The closures runs in the current thread.
13-
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
14-
where
15-
F: Future + Send;
13+
async fn fut_consume(self, func: impl FnMut(Self::Item) + Send);
1614
}
1715

1816
#[async_trait::async_trait]
@@ -23,10 +21,7 @@ where
2321
Fut::Output: Send + 'static,
2422
{
2523
type Item = Fut::Output;
26-
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
27-
where
28-
F: Future + Send,
29-
{
24+
async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) {
3025
let mut rx = {
3126
// Create the channel in the closure to ensure all sender are dropped when iterator completes
3227
// This ensures that the receiver does not get stuck in an infinite loop.
@@ -43,7 +38,7 @@ where
4338
};
4439

4540
while let Some(data) = rx.recv().await {
46-
func(data).await;
41+
func(data);
4742
}
4843
}
4944
}
@@ -61,7 +56,7 @@ mod test {
6156
async fn available() {
6257
(0..10)
6358
.map(|item| async move { item * 2 })
64-
.fut_consume(|item| async move { assert_eq!(item % 2, 0) })
59+
.fut_consume(|item| assert_eq!(item % 2, 0))
6560
.await;
6661
}
6762

@@ -74,8 +69,8 @@ mod test {
7469
sleep(Duration::from_millis(item)).await;
7570
item
7671
})
77-
.fut_consume(|_| async move {
78-
sleep(Duration::from_millis(20)).await;
72+
.fut_consume(|_| {
73+
std::thread::sleep(std::time::Duration::from_millis(20));
7974
})
8075
.await;
8176
let time1 = SystemTime::now().duration_since(start).unwrap();

crates/rspack_core/src/utils/iterator_consumer/rayon.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub trait RayonConsumer {
88
/// Use to immediately consume the data produced by the rayon iterator
99
/// without waiting for all the data to be processed.
1010
/// The closures runs in the current thread.
11-
fn consume(self, func: impl Fn(Self::Item));
11+
fn consume(self, func: impl FnMut(Self::Item));
1212
}
1313

1414
impl<P, I> RayonConsumer for P
@@ -17,7 +17,7 @@ where
1717
I: Send,
1818
{
1919
type Item = I;
20-
fn consume(self, func: impl Fn(Self::Item)) {
20+
fn consume(self, mut func: impl FnMut(Self::Item)) {
2121
let (tx, rx) = channel::<Self::Item>();
2222
std::thread::scope(|s| {
2323
// move rx to s.spawn, otherwise rx.into_iter will never stop

crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ pub trait RayonFutureConsumer {
1313
/// Use to immediately consume the data produced by the future in the rayon iterator
1414
/// without waiting for all the data to be processed.
1515
/// The closures runs in the current thread.
16-
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
17-
where
18-
F: Future + Send;
16+
async fn fut_consume(self, func: impl FnMut(Self::Item) + Send);
1917
}
2018

2119
#[async_trait::async_trait]
@@ -26,10 +24,7 @@ where
2624
Fut::Output: Send + 'static,
2725
{
2826
type Item = Fut::Output;
29-
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
30-
where
31-
F: Future + Send,
32-
{
27+
async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) {
3328
let mut rx = {
3429
// Create the channel in the closure to ensure all sender are dropped when iterator completes
3530
// This ensures that the receiver does not get stuck in an infinite loop.
@@ -46,7 +41,7 @@ where
4641
};
4742

4843
while let Some(data) = rx.recv().await {
49-
func(data).await;
44+
func(data);
5045
}
5146
}
5247
}
@@ -66,7 +61,7 @@ mod test {
6661
(0..10)
6762
.into_par_iter()
6863
.map(|item| async move { item * 2 })
69-
.fut_consume(|item| async move { assert_eq!(item % 2, 0) })
64+
.fut_consume(|item| assert_eq!(item % 2, 0))
7065
.await;
7166
}
7267

@@ -79,8 +74,8 @@ mod test {
7974
sleep(Duration::from_millis(item)).await;
8075
item
8176
})
82-
.fut_consume(|_| async move {
83-
sleep(Duration::from_millis(20)).await;
77+
.fut_consume(|_| {
78+
std::thread::sleep(std::time::Duration::from_millis(20));
8479
})
8580
.await;
8681
let time1 = SystemTime::now().duration_since(start).unwrap();

0 commit comments

Comments
 (0)