|
1 | 1 | use std::collections::HashMap; |
2 | | -use std::iter::Iterator; |
3 | | -use std::marker::PhantomData; |
4 | 2 |
|
5 | | -use std::any::Any; |
6 | 3 | use std::boxed::Box; |
7 | | -use std::sync::Mutex; |
| 4 | +use std::sync::{Arc, Mutex}; |
8 | 5 |
|
9 | 6 | use once_cell::sync::Lazy; |
10 | 7 |
|
11 | 8 | use crate::elem_types::kv::KV; |
| 9 | +use crate::elem_types::ElemType; |
| 10 | +use crate::transforms::group_by_key::KeyExtractor; |
| 11 | +use crate::transforms::pardo::DoFn; |
| 12 | +use crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue}; |
| 13 | +use crate::worker::Receiver; |
12 | 14 |
|
13 | | -static SERIALIZED_FNS: Lazy<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> = |
| 15 | +static DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> = |
14 | 16 | Lazy::new(|| Mutex::new(HashMap::new())); |
15 | 17 |
|
16 | | -pub fn serialize_fn<T: Any + Sync + Send>(obj: Box<T>) -> String { |
17 | | - let mut serialized_fns = SERIALIZED_FNS.lock().unwrap(); |
18 | | - let name = format!("object{}", serialized_fns.len()); |
19 | | - serialized_fns.insert(name.to_string(), obj); |
| 18 | +static KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> = |
| 19 | + Lazy::new(|| Mutex::new(HashMap::new())); |
| 20 | + |
| 21 | +pub fn store_do_fn(do_fn: impl DoFn + 'static) -> String { |
| 22 | + let mut do_fns = DO_FNS.lock().unwrap(); |
| 23 | + let name = format!("object{}", do_fns.len()); |
| 24 | + do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn))); |
20 | 25 | name |
21 | 26 | } |
22 | 27 |
|
23 | | -pub fn deserialize_fn<T: Any + Sync + Send>(name: &String) -> Option<&'static T> { |
24 | | - let binding = SERIALIZED_FNS.lock().unwrap(); |
25 | | - let untyped = binding.get(name); |
26 | | - let typed = match untyped { |
27 | | - Some(x) => x.downcast_ref::<T>(), |
28 | | - None => None, |
29 | | - }; |
30 | | - |
31 | | - unsafe { std::mem::transmute::<Option<&T>, Option<&'static T>>(typed) } |
| 28 | +pub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> { |
| 29 | + let binding = DO_FNS.lock().unwrap(); |
| 30 | + binding.get(name).copied() |
32 | 31 | } |
33 | 32 |
|
34 | | -// ******* DoFn Wrappers, perhaps move elsewhere? ******* |
35 | | - |
36 | | -// TODO: Give these start/finish_bundles, etc. |
37 | | -pub type GenericDoFn = |
38 | | - Box<dyn Fn(&dyn Any) -> Box<dyn Iterator<Item = Box<dyn Any>>> + Send + Sync>; |
39 | | - |
40 | | -struct GenericDoFnWrapper { |
41 | | - _func: GenericDoFn, |
| 33 | +pub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String { |
| 34 | + let mut kes = KEY_EXTRACTORS.lock().unwrap(); |
| 35 | + let name = format!("object{}", kes.len()); |
| 36 | + kes.insert(name.to_string(), Box::leak(Box::new(ke))); |
| 37 | + name |
42 | 38 | } |
43 | 39 |
|
44 | | -unsafe impl std::marker::Send for GenericDoFnWrapper {} |
45 | | - |
46 | | -struct BoxedIter<O: Any, I: IntoIterator<Item = O>> { |
47 | | - typed_iter: I::IntoIter, |
| 40 | +pub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> { |
| 41 | + KEY_EXTRACTORS.lock().unwrap().get(name).copied() |
48 | 42 | } |
49 | 43 |
|
50 | | -impl<O: Any, I: IntoIterator<Item = O>> Iterator for BoxedIter<O, I> { |
51 | | - type Item = Box<dyn Any>; |
| 44 | +pub trait DynamicDoFn: Send + Sync { |
| 45 | + fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]); |
| 46 | + fn start_bundle_dyn(&self); |
| 47 | + fn finish_bundle_dyn(&self); |
| 48 | +} |
52 | 49 |
|
53 | | - fn next(&mut self) -> Option<Box<dyn Any>> { |
54 | | - if let Some(x) = self.typed_iter.next() { |
55 | | - Some(Box::new(x)) |
56 | | - } else { |
57 | | - None |
| 50 | +impl<D: DoFn> DynamicDoFn for D { |
| 51 | + fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) { |
| 52 | + let typed_elem = elem.downcast_ref::<D::In>(); |
| 53 | + for value in self.process(&typed_elem.value) { |
| 54 | + let windowed_value = typed_elem.with_value(value); |
| 55 | + for receiver in receivers { |
| 56 | + receiver.receive(DynamicWindowedValue::new(&windowed_value)) |
| 57 | + } |
58 | 58 | } |
59 | 59 | } |
60 | | -} |
61 | 60 |
|
62 | | -pub fn to_generic_dofn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>( |
63 | | - func: fn(&T) -> I, |
64 | | -) -> GenericDoFn { |
65 | | - Box::new( |
66 | | - move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> { |
67 | | - let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap(); |
68 | | - Box::new(BoxedIter::<O, I> { |
69 | | - typed_iter: func(typed_input).into_iter(), |
70 | | - }) |
71 | | - }, |
72 | | - ) |
73 | | -} |
74 | | - |
75 | | -pub fn to_generic_dofn_dyn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>( |
76 | | - func: Box<dyn Fn(&T) -> I + Send + Sync>, |
77 | | -) -> GenericDoFn { |
78 | | - Box::new( |
79 | | - move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> { |
80 | | - let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap(); |
81 | | - Box::new(BoxedIter::<O, I> { |
82 | | - typed_iter: func(typed_input).into_iter(), |
83 | | - }) |
84 | | - }, |
85 | | - ) |
86 | | -} |
| 61 | + fn start_bundle_dyn(&self) { |
| 62 | + self.start_bundle() |
| 63 | + } |
87 | 64 |
|
88 | | -pub trait KeyExtractor: Sync + Send { |
89 | | - fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>>; |
90 | | - fn recombine( |
91 | | - &self, |
92 | | - key: &str, |
93 | | - values: &Box<Vec<Box<dyn Any + Sync + Send>>>, |
94 | | - ) -> Box<dyn Any + Sync + Send>; |
| 65 | + fn finish_bundle_dyn(&self) { |
| 66 | + self.finish_bundle() |
| 67 | + } |
95 | 68 | } |
96 | 69 |
|
97 | | -pub struct TypedKeyExtractor<V: Clone + Sync + Send + 'static> { |
98 | | - phantom_data: PhantomData<V>, |
| 70 | +pub trait DynamicKeyExtractor: Sync + Send { |
| 71 | + fn new_grouped_values(&self) -> DynamicGroupedValues; |
| 72 | + fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues); |
| 73 | + fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues); |
| 74 | + fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]); |
99 | 75 | } |
100 | 76 |
|
101 | | -impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> { |
102 | | - fn default() -> Self { |
103 | | - Self { |
104 | | - phantom_data: PhantomData, |
105 | | - } |
| 77 | +impl<V: ElemType> DynamicKeyExtractor for KeyExtractor<V> { |
| 78 | + fn new_grouped_values(&self) -> DynamicGroupedValues { |
| 79 | + DynamicGroupedValues::new::<V>() |
106 | 80 | } |
107 | | -} |
| 81 | + fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) { |
| 82 | + grouped_values.downcast_mut::<V>().clear() |
| 83 | + } |
| 84 | + fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) { |
| 85 | + let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value; |
| 86 | + let grouped_values = grouped_values.downcast_mut::<V>(); |
108 | 87 |
|
109 | | -impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> { |
110 | | - fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>> { |
111 | | - let typed_kv = kv.downcast_ref::<KV<String, V>>().unwrap(); |
112 | | - KV { |
113 | | - k: typed_kv.k.clone(), |
114 | | - v: Box::new(typed_kv.v.clone()), |
| 88 | + if !grouped_values.contains_key(k) { |
| 89 | + grouped_values.insert(k.clone(), Vec::new()); |
115 | 90 | } |
| 91 | + grouped_values.get_mut(k).unwrap().push(v.clone()); |
116 | 92 | } |
117 | | - fn recombine( |
118 | | - &self, |
119 | | - key: &str, |
120 | | - values: &Box<Vec<Box<dyn Any + Sync + Send>>>, |
121 | | - ) -> Box<dyn Any + Sync + Send> { |
122 | | - let mut typed_values: Vec<V> = Vec::new(); |
123 | | - for untyped_value in values.iter() { |
124 | | - typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone()); |
| 93 | + |
| 94 | + fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) { |
| 95 | + let typed_grouped_values = grouped_values.downcast_ref::<V>(); |
| 96 | + for (key, values) in typed_grouped_values.iter() { |
| 97 | + // TODO: timestamp and pane info are wrong |
| 98 | + for receiver in receivers.iter() { |
| 99 | + // TODO: End-of-window timestamp, only firing pane. |
| 100 | + let mut typed_values: Vec<V> = Vec::new(); |
| 101 | + for value in values.iter() { |
| 102 | + typed_values.push(value.clone()); |
| 103 | + } |
| 104 | + let res = KV { |
| 105 | + k: key.to_string(), |
| 106 | + v: typed_values, |
| 107 | + }; |
| 108 | + receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window( |
| 109 | + res, |
| 110 | + ))); |
| 111 | + } |
125 | 112 | } |
126 | | - Box::new(KV { |
127 | | - k: key.to_string(), |
128 | | - v: typed_values, |
129 | | - }) |
130 | 113 | } |
131 | 114 | } |
0 commit comments