#![warn(
missing_copy_implementations,
missing_debug_implementations,
// missing_docs,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results,
// clippy::pedantic
)] #![allow(unreachable_code, unused_braces, clippy::type_complexity)]
#[cfg(feature = "constellation")]
use constellation::*;
use std::time::{Duration, SystemTime};
use amadeus::{data::Webpage, dist::prelude::*};
fn main() {
if cfg!(miri) {
return;
}
#[cfg(feature = "constellation")]
init(Resources::default());
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
let process_pool_time = "-";
println!("in {:?} {:?}", thread_pool_time, process_pool_time);
})
}
async fn run<P: amadeus_core::pool::ProcessPool>(pool: &P) -> Duration {
let start = SystemTime::now();
let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();
let _ = webpages
.dist_stream()
.all(
pool,
FnMut!(move |x: Result<Webpage<'static>, _>| -> bool {
let _x = x.unwrap();
start.elapsed().unwrap() < Duration::new(10, 0)
}),
)
.await;
return start.elapsed().unwrap();
let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();
let top: (
((
),),
(
u32,
u32,
std::collections::HashSet<u32>,
amadeus_streaming::Top<u32,usize>,
amadeus_streaming::Top<usize,amadeus_streaming::HyperLogLogMagnitude<Vec<u8>>>,
amadeus_streaming::SampleUnstable<u32>,
),
) = webpages.dist_stream().map(FnMut!(|webpage:Result<_,_>|webpage.unwrap()))
.fork(
&pool,
((
),),
(
Identity
.map(FnMut!(|x: &Webpage<'static>| -> usize { x.contents.len() }))
.map(FnMut!(|x: usize| -> u32 { x as u32 }))
.fold(
FnMut!(|| 0_u32),
FnMut!(|a: u32, b: either::Either<u32, u32>| a + b.into_inner()),
),
Identity
.map(FnMut!(|x: &Webpage<'static>| -> usize { x.contents.len() }))
.map(FnMut!(|x: usize| -> u32 { x as u32 }))
.sum(),
Identity
.map(FnMut!(|x: &Webpage<'static>| -> usize { x.contents.len() }))
.map(FnMut!(|x: usize| -> u32 { x as u32 }))
.collect(),
Identity
.map(FnMut!(|x: &Webpage<'static>| -> usize { x.contents.len() }))
.map(FnMut!(|x: usize| -> u32 { x as u32 }))
.most_frequent(100, 0.99, 2.0/1000.0),
Identity
.map(FnMut!(|x: &Webpage<'static>| { (x.contents.len(),x.contents[..5].to_owned()) }))
.most_distinct(100, 0.99, 2.0/1000.0, 0.0808),
Identity
.cloned()
.map(FnMut!(|x: Webpage<'static>| -> usize { x.contents.len() }))
.map(FnMut!(|x: usize| -> u32 { x as u32 }))
.sample_unstable(100),
),
).await;
println!("{:?}", top);
start.elapsed().unwrap()
}