Skip to content

Commit 3565f68

Browse files
committed
melib: convert MailBackend::watch into a stream
Make the watch method of the MailBackend trait return a Stream that returns a melib::BackendEvent or an error. This refactoring will allow for easier testing of the watch implementations. Signed-off-by: Manos Pitsidianakis <[email protected]>
1 parent 2a7a800 commit 3565f68

File tree

14 files changed

+770
-704
lines changed

14 files changed

+770
-704
lines changed

meli/src/accounts.rs

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ impl Account {
833833
if let Some(wait) = wait {
834834
sleep(wait).await;
835835
}
836-
fut.await
836+
fut.into_future().await
837837
};
838838
let handle = self.main_loop_handler.job_executor.spawn(
839839
"watch".into(),
@@ -1732,30 +1732,59 @@ impl Account {
17321732
}
17331733
}
17341734
JobRequest::Watch { ref mut handle } => {
1735-
log::trace!("JobRequest::Watch finished??? ");
1736-
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
1737-
if err.kind.is_timeout()
1738-
|| matches!(
1739-
err.kind,
1740-
ErrorKind::Network(NetworkErrorKind::HostLookupFailed)
1741-
)
1742-
{
1743-
self.watch(Some(Duration::from_secs(3)));
1744-
} else {
1745-
self.main_loop_handler
1746-
.job_executor
1747-
.set_job_success(job_id, false);
1748-
// [ref:TODO]: relaunch watch job with ratelimit for failure
1749-
self.main_loop_handler.send(ThreadEvent::UIEvent(
1750-
UIEvent::Notification {
1751-
title: Some(
1752-
format!("{}: watch thread failed", &self.name).into(),
1753-
),
1754-
source: None,
1755-
body: err.to_string().into(),
1756-
kind: Some(NotificationType::Error(err.kind)),
1757-
},
1758-
));
1735+
log::trace!("JobRequest::Watch event");
1736+
is_canceled! { handle };
1737+
match handle.chan.try_recv() {
1738+
Err(_) => { /* canceled */ }
1739+
Ok(Some((None, _))) => {
1740+
log::trace!("JobRequest::Watch stream returned None");
1741+
self.watch(None);
1742+
}
1743+
Ok(None) => {
1744+
self.watch(None);
1745+
}
1746+
Ok(Some((Some(ev), rest))) => {
1747+
let handle = self.main_loop_handler.job_executor.spawn(
1748+
"watch".into(),
1749+
rest.into_future(),
1750+
self.is_async(),
1751+
);
1752+
self.active_jobs
1753+
.insert(handle.job_id, JobRequest::Watch { handle });
1754+
match ev {
1755+
Err(err) => {
1756+
log::trace!("JobRequest::Watch error {}", err);
1757+
if err.kind.is_timeout()
1758+
|| matches!(
1759+
err.kind,
1760+
ErrorKind::Network(NetworkErrorKind::HostLookupFailed)
1761+
)
1762+
{
1763+
self.watch(Some(Duration::from_secs(3)));
1764+
} else {
1765+
self.main_loop_handler
1766+
.job_executor
1767+
.set_job_success(job_id, false);
1768+
// [ref:TODO]: relaunch watch job with ratelimit for failure
1769+
self.main_loop_handler.send(ThreadEvent::UIEvent(
1770+
UIEvent::Notification {
1771+
title: Some(
1772+
format!("{}: watch thread failed", &self.name)
1773+
.into(),
1774+
),
1775+
source: None,
1776+
body: err.to_string().into(),
1777+
kind: Some(NotificationType::Error(err.kind)),
1778+
},
1779+
));
1780+
}
1781+
}
1782+
Ok(ev) => {
1783+
self.main_loop_handler.send(ThreadEvent::UIEvent(
1784+
UIEvent::BackendEvent(self.hash, ev),
1785+
));
1786+
}
1787+
}
17591788
}
17601789
}
17611790
}

meli/src/accounts/jobs.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
//
2121
// SPDX-License-Identifier: EUPL-1.2 OR GPL-3.0-or-later
2222

23-
use std::{borrow::Cow, collections::HashMap, pin::Pin};
23+
use std::{borrow::Cow, collections::HashMap};
2424

25-
use futures::stream::Stream;
26-
use melib::{backends::*, email::*, error::Result, LogLevel};
25+
use melib::{backends::prelude::*, error::Result, LogLevel};
2726
use smallvec::SmallVec;
2827

2928
use crate::{is_variant, jobs::JoinHandle, StatusEvent};
@@ -113,7 +112,7 @@ pub enum JobRequest {
113112
#[allow(clippy::type_complexity)]
114113
handle: JoinHandle<(
115114
Option<Result<Vec<Envelope>>>,
116-
Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>,
115+
BoxStream<'static, Result<Vec<Envelope>>>,
117116
)>,
118117
},
119118
Generic {
@@ -149,7 +148,11 @@ pub enum JobRequest {
149148
handle: JoinHandle<Result<()>>,
150149
},
151150
Watch {
152-
handle: JoinHandle<Result<()>>,
151+
#[allow(clippy::type_complexity)]
152+
handle: JoinHandle<(
153+
Option<Result<BackendEvent>>,
154+
BoxStream<'static, Result<BackendEvent>>,
155+
)>,
153156
},
154157
Mailbox(MailboxJobRequest),
155158
}

melib/src/backends.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub mod prelude {
3838
borrow::Cow,
3939
cell::RefCell,
4040
collections::{BTreeSet, HashMap},
41+
convert::TryFrom,
4142
ops::Deref,
4243
pin::Pin,
4344
sync::Arc,
@@ -447,7 +448,7 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
447448
}
448449
fn fetch(&mut self, mailbox_hash: MailboxHash) -> ResultStream<Vec<Envelope>>;
449450
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
450-
fn watch(&self) -> ResultFuture<()>;
451+
fn watch(&self) -> ResultStream<BackendEvent>;
451452
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>>;
452453
fn envelope_bytes_by_hash(&self, hash: EnvelopeHash) -> ResultFuture<Vec<u8>>;
453454

melib/src/imap/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1619,7 +1619,9 @@ impl ImapConnection {
16191619
.get(&mailbox_hash)
16201620
.cloned()
16211621
.unwrap();
1622-
crate::imap::watch::examine_updates(mailbox, self).await?;
1622+
if let Some(ev) = crate::imap::watch::examine_updates(mailbox, self).await? {
1623+
self.add_backend_event(ev);
1624+
}
16231625
Ok(())
16241626
}
16251627

melib/src/imap/mod.rs

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -494,11 +494,13 @@ impl MailBackend for ImapType {
494494
}))
495495
}
496496

497-
fn watch(&self) -> ResultFuture<()> {
497+
fn watch(&self) -> ResultStream<BackendEvent> {
498498
let server_conf = self.server_conf.clone();
499499
let main_conn = self.connection.clone();
500500
let uid_store = self.uid_store.clone();
501-
Ok(Box::pin(async move {
501+
Ok(Box::pin(try_fn_stream(|emitter| async move {
502+
use futures::stream::StreamExt;
503+
502504
let has_idle: bool = match server_conf.protocol {
503505
ImapProtocol::IMAP {
504506
extension_use: ImapExtensionUse { idle, .. },
@@ -512,8 +514,12 @@ impl MailBackend for ImapType {
512514
}
513515
_ => false,
514516
};
515-
while let Err(err) = if has_idle {
516-
idle(ImapWatchKit {
517+
enum WatchKit {
518+
Idle(BoxStream<'static, Result<BackendEvent>>),
519+
Poll(BoxStream<'static, Result<BackendEvent>>),
520+
}
521+
let mut watch_kit = if has_idle {
522+
WatchKit::Idle(Box::pin(idle(ImapWatchKit {
517523
conn: ImapConnection::new_connection(
518524
&server_conf,
519525
format!(
@@ -526,10 +532,9 @@ impl MailBackend for ImapType {
526532
),
527533
main_conn: main_conn.clone(),
528534
uid_store: uid_store.clone(),
529-
})
530-
.await
535+
})))
531536
} else {
532-
poll_with_examine(ImapWatchKit {
537+
WatchKit::Poll(Box::pin(poll_with_examine(ImapWatchKit {
533538
conn: ImapConnection::new_connection(
534539
&server_conf,
535540
format!(
@@ -542,50 +547,65 @@ impl MailBackend for ImapType {
542547
),
543548
main_conn: main_conn.clone(),
544549
uid_store: uid_store.clone(),
545-
})
546-
.await
547-
} {
548-
let mut main_conn_lck = main_conn.lock().await?;
549-
if err.kind.is_network() {
550-
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
551-
} else {
552-
return Err(err);
550+
})))
551+
};
552+
while let Some(ev) = {
553+
match watch_kit {
554+
WatchKit::Idle(ref mut idle) => idle.next().await,
555+
WatchKit::Poll(ref mut poll) => poll.next().await,
553556
}
554-
log::trace!(
555-
"{} Watch failure: {}",
556-
uid_store.account_name,
557-
err.to_string()
558-
);
559-
match timeout(uid_store.timeout, main_conn_lck.connect())
560-
.await
561-
.and_then(|res| res)
562-
{
563-
Err(err2) => {
557+
} {
558+
match ev {
559+
Ok(ok) => emitter.emit(ok).await,
560+
Err(err) => {
561+
let mut main_conn_lck = main_conn.lock().await?;
562+
if err.kind.is_network() {
563+
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
564+
} else {
565+
return Err(err);
566+
}
564567
log::trace!(
565-
"{} Watch reconnect attempt failed: {}",
568+
"{} Watch failure: {}",
566569
uid_store.account_name,
567-
err2.to_string()
570+
err.to_string()
568571
);
569-
}
570-
Ok(()) => {
571-
log::trace!(
572-
"{} Watch reconnect attempt successful",
573-
uid_store.account_name
574-
);
575-
continue;
572+
match timeout(uid_store.timeout, main_conn_lck.connect())
573+
.await
574+
.and_then(|res| res)
575+
{
576+
Err(err2) => {
577+
log::trace!(
578+
"{} Watch reconnect attempt failed: {}",
579+
uid_store.account_name,
580+
err2.to_string()
581+
);
582+
}
583+
Ok(()) => {
584+
log::trace!(
585+
"{} Watch reconnect attempt successful",
586+
uid_store.account_name
587+
);
588+
continue;
589+
}
590+
}
591+
let account_hash = uid_store.account_hash;
592+
emitter
593+
.emit(
594+
RefreshEvent {
595+
account_hash,
596+
mailbox_hash: MailboxHash::default(),
597+
kind: RefreshEventKind::Failure(err.clone()),
598+
}
599+
.into(),
600+
)
601+
.await;
602+
return Err(err);
576603
}
577604
}
578-
let account_hash = uid_store.account_hash;
579-
main_conn_lck.add_refresh_event(RefreshEvent {
580-
account_hash,
581-
mailbox_hash: MailboxHash::default(),
582-
kind: RefreshEventKind::Failure(err.clone()),
583-
});
584-
return Err(err);
585605
}
586606
log::trace!("{} watch future returning", uid_store.account_name);
587607
Ok(())
588-
}))
608+
})))
589609
}
590610

591611
fn envelope_bytes_by_hash(&self, hash: EnvelopeHash) -> ResultFuture<Vec<u8>> {

0 commit comments

Comments
 (0)