Skip to content

Commit 0f920ce

Browse files
committed
feat(rust/dev): add DevEngine#close (#6376)
1 parent 5c94508 commit 0f920ce

File tree

4 files changed

+70
-11
lines changed

4 files changed

+70
-11
lines changed

crates/rolldown/examples/dev.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ async fn main() {
2828
)
2929
.unwrap();
3030
dev_engine.run().await.unwrap();
31-
dev_engine.wait_for_close().await;
31+
dev_engine.wait_for_build_driver_service_close().await.unwrap();
3232
}

crates/rolldown/src/dev/build_driver_service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::dev::{
1010
pub enum BuildMessage {
1111
WatchEvent(FileChangeResult),
1212
BuildFinish,
13+
Close,
1314
}
1415

1516
pub type BuildChannelTx = UnboundedSender<BuildMessage>;
@@ -74,6 +75,9 @@ impl BuildDriverService {
7475
.await
7576
.expect("FIXME: should handle this error");
7677
}
78+
BuildMessage::Close => {
79+
break;
80+
}
7781
}
7882
}
7983
}

crates/rolldown/src/dev/dev_engine.rs

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{collections::VecDeque, ops::Deref, path::PathBuf, sync::Arc};
1+
use std::{
2+
collections::VecDeque,
3+
ops::Deref,
4+
path::PathBuf,
5+
sync::{Arc, atomic::AtomicBool},
6+
};
27

38
use arcstr::ArcStr;
49
use futures::{FutureExt, future::Shared};
@@ -32,9 +37,10 @@ pub struct DevEngine {
3237
build_driver: SharedBuildDriver,
3338
watcher: Mutex<DynWatcher>,
3439
watched_files: FxDashSet<ArcStr>,
35-
watch_service_state: Mutex<BuildDriverServiceState>,
40+
build_driver_service_state: Mutex<BuildDriverServiceState>,
3641
ctx: SharedDevContext,
3742
pub clients: SharedClients,
43+
is_closed: AtomicBool,
3844
}
3945

4046
impl DevEngine {
@@ -128,17 +134,18 @@ impl DevEngine {
128134
build_driver,
129135
watcher: Mutex::new(watcher),
130136
watched_files: FxDashSet::default(),
131-
watch_service_state: Mutex::new(BuildDriverServiceState {
137+
build_driver_service_state: Mutex::new(BuildDriverServiceState {
132138
service: Some(build_driver_service),
133139
handle: None,
134140
}),
135141
ctx,
136142
clients,
143+
is_closed: AtomicBool::new(false),
137144
})
138145
}
139146

140147
pub async fn run(&self) -> BuildResult<()> {
141-
let mut build_service_state = self.watch_service_state.lock().await;
148+
let mut build_service_state = self.build_driver_service_state.lock().await;
142149

143150
if build_service_state.service.is_none() {
144151
// The watcher service is already running.
@@ -174,25 +181,58 @@ impl DevEngine {
174181
Ok(())
175182
}
176183

177-
pub async fn wait_for_close(&self) {
178-
let watch_service_state = self.watch_service_state.lock().await;
179-
if let Some(watcher_service_handle) = watch_service_state.handle.clone() {
180-
watcher_service_handle.await;
184+
pub async fn wait_for_build_driver_service_close(&self) -> BuildResult<()> {
185+
self.create_error_if_closed()?;
186+
let service_state = self.build_driver_service_state.lock().await;
187+
if let Some(service_handle) = service_state.handle.clone() {
188+
service_handle.await;
181189
}
190+
Ok(())
182191
}
183192

184-
pub async fn ensure_current_build_finish(&self) {
193+
pub async fn ensure_current_build_finish(&self) -> BuildResult<()> {
194+
self.create_error_if_closed()?;
185195
self.ctx.ensure_current_build_finish().await;
196+
Ok(())
186197
}
187198

188199
pub async fn invalidate(
189200
&self,
190201
caller: String,
191202
first_invalidated_by: Option<String>,
192203
) -> BuildResult<Vec<ClientHmrUpdate>> {
204+
self.create_error_if_closed()?;
193205
self.build_driver.invalidate(caller, first_invalidated_by).await
194206
}
195207

208+
pub async fn close(&self) -> BuildResult<()> {
209+
if self.is_closed.swap(true, std::sync::atomic::Ordering::SeqCst) {
210+
return Ok(());
211+
}
212+
213+
// Send close message to build driver service
214+
if let Err(_e) = self.ctx.build_channel_tx.send(BuildMessage::Close) {
215+
// If `BuildMessage::Close` is not sent, it means the channel is closed due to error or what.
216+
// It's ok to ignore the send error.
217+
}
218+
219+
// Clean up watcher
220+
let watcher =
221+
std::mem::replace(&mut *self.watcher.lock().await, NoopWatcher.into_dyn_watcher());
222+
std::mem::drop(watcher);
223+
224+
// Close the bundler
225+
let mut bundler = self.build_driver.bundler.lock().await;
226+
bundler.close().await?;
227+
228+
// Wait for build driver service to close
229+
let service_state = self.build_driver_service_state.lock().await;
230+
if let Some(service_handle) = service_state.handle.clone() {
231+
service_handle.await;
232+
}
233+
Ok(())
234+
}
235+
196236
/// For testing purpose.
197237
pub async fn ensure_task_with_changed_files(&self, changed_files: FxIndexSet<PathBuf>) {
198238
self.build_driver.handle_file_changes(changed_files).await;
@@ -211,6 +251,17 @@ impl DevEngine {
211251
}
212252
self.clients.insert("test".to_string(), client_session);
213253
}
254+
255+
pub fn is_closed(&self) -> bool {
256+
self.is_closed.load(std::sync::atomic::Ordering::SeqCst)
257+
}
258+
259+
fn create_error_if_closed(&self) -> BuildResult<()> {
260+
if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) {
261+
Err(anyhow::anyhow!("Dev engine is closed"))?;
262+
}
263+
Ok(())
264+
}
214265
}
215266

216267
impl Deref for DevEngine {

crates/rolldown_binding/src/binding_dev_engine.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ impl BindingDevEngine {
9696

9797
#[napi]
9898
pub async fn ensure_current_build_finish(&self) -> napi::Result<()> {
99-
self.inner.ensure_current_build_finish().await;
99+
self
100+
.inner
101+
.ensure_current_build_finish()
102+
.await
103+
.map_err(|_e| napi::Error::from_reason("Failed to ensure current build finish"))?;
100104
Ok(())
101105
}
102106

0 commit comments

Comments
 (0)