Skip to content

Commit e5caac3

Browse files
authored
Merge pull request apache#14 from dahlbaek/remove-any
Reduce use of Any
2 parents f7df217 + 903c0c2 commit e5caac3

7 files changed

Lines changed: 237 additions & 241 deletions

File tree

Lines changed: 79 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,114 @@
11
use std::collections::HashMap;
2-
use std::iter::Iterator;
3-
use std::marker::PhantomData;
42

5-
use std::any::Any;
63
use std::boxed::Box;
7-
use std::sync::Mutex;
4+
use std::sync::{Arc, Mutex};
85

96
use once_cell::sync::Lazy;
107

118
use crate::elem_types::kv::KV;
9+
use crate::elem_types::ElemType;
10+
use crate::transforms::group_by_key::KeyExtractor;
11+
use crate::transforms::pardo::DoFn;
12+
use crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};
13+
use crate::worker::Receiver;
1214

13-
static SERIALIZED_FNS: Lazy<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> =
15+
static DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =
1416
Lazy::new(|| Mutex::new(HashMap::new()));
1517

16-
pub fn serialize_fn<T: Any + Sync + Send>(obj: Box<T>) -> String {
17-
let mut serialized_fns = SERIALIZED_FNS.lock().unwrap();
18-
let name = format!("object{}", serialized_fns.len());
19-
serialized_fns.insert(name.to_string(), obj);
18+
static KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =
19+
Lazy::new(|| Mutex::new(HashMap::new()));
20+
21+
pub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {
22+
let mut do_fns = DO_FNS.lock().unwrap();
23+
let name = format!("object{}", do_fns.len());
24+
do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));
2025
name
2126
}
2227

23-
pub fn deserialize_fn<T: Any + Sync + Send>(name: &String) -> Option<&'static T> {
24-
let binding = SERIALIZED_FNS.lock().unwrap();
25-
let untyped = binding.get(name);
26-
let typed = match untyped {
27-
Some(x) => x.downcast_ref::<T>(),
28-
None => None,
29-
};
30-
31-
unsafe { std::mem::transmute::<Option<&T>, Option<&'static T>>(typed) }
28+
pub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {
29+
let binding = DO_FNS.lock().unwrap();
30+
binding.get(name).copied()
3231
}
3332

34-
// ******* DoFn Wrappers, perhaps move elsewhere? *******
35-
36-
// TODO: Give these start/finish_bundles, etc.
37-
pub type GenericDoFn =
38-
Box<dyn Fn(&dyn Any) -> Box<dyn Iterator<Item = Box<dyn Any>>> + Send + Sync>;
39-
40-
struct GenericDoFnWrapper {
41-
_func: GenericDoFn,
33+
pub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {
34+
let mut kes = KEY_EXTRACTORS.lock().unwrap();
35+
let name = format!("object{}", kes.len());
36+
kes.insert(name.to_string(), Box::leak(Box::new(ke)));
37+
name
4238
}
4339

44-
unsafe impl std::marker::Send for GenericDoFnWrapper {}
45-
46-
struct BoxedIter<O: Any, I: IntoIterator<Item = O>> {
47-
typed_iter: I::IntoIter,
40+
pub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {
41+
KEY_EXTRACTORS.lock().unwrap().get(name).copied()
4842
}
4943

50-
impl<O: Any, I: IntoIterator<Item = O>> Iterator for BoxedIter<O, I> {
51-
type Item = Box<dyn Any>;
44+
pub trait DynamicDoFn: Send + Sync {
45+
fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);
46+
fn start_bundle_dyn(&self);
47+
fn finish_bundle_dyn(&self);
48+
}
5249

53-
fn next(&mut self) -> Option<Box<dyn Any>> {
54-
if let Some(x) = self.typed_iter.next() {
55-
Some(Box::new(x))
56-
} else {
57-
None
50+
impl<D: DoFn> DynamicDoFn for D {
51+
fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {
52+
let typed_elem = elem.downcast_ref::<D::In>();
53+
for value in self.process(&typed_elem.value) {
54+
let windowed_value = typed_elem.with_value(value);
55+
for receiver in receivers {
56+
receiver.receive(DynamicWindowedValue::new(&windowed_value))
57+
}
5858
}
5959
}
60-
}
6160

62-
pub fn to_generic_dofn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
63-
func: fn(&T) -> I,
64-
) -> GenericDoFn {
65-
Box::new(
66-
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
67-
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
68-
Box::new(BoxedIter::<O, I> {
69-
typed_iter: func(typed_input).into_iter(),
70-
})
71-
},
72-
)
73-
}
74-
75-
pub fn to_generic_dofn_dyn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
76-
func: Box<dyn Fn(&T) -> I + Send + Sync>,
77-
) -> GenericDoFn {
78-
Box::new(
79-
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
80-
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
81-
Box::new(BoxedIter::<O, I> {
82-
typed_iter: func(typed_input).into_iter(),
83-
})
84-
},
85-
)
86-
}
61+
fn start_bundle_dyn(&self) {
62+
self.start_bundle()
63+
}
8764

88-
pub trait KeyExtractor: Sync + Send {
89-
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>>;
90-
fn recombine(
91-
&self,
92-
key: &str,
93-
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
94-
) -> Box<dyn Any + Sync + Send>;
65+
fn finish_bundle_dyn(&self) {
66+
self.finish_bundle()
67+
}
9568
}
9669

97-
pub struct TypedKeyExtractor<V: Clone + Sync + Send + 'static> {
98-
phantom_data: PhantomData<V>,
70+
pub trait DynamicKeyExtractor: Sync + Send {
71+
fn new_grouped_values(&self) -> DynamicGroupedValues;
72+
fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);
73+
fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);
74+
fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);
9975
}
10076

101-
impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> {
102-
fn default() -> Self {
103-
Self {
104-
phantom_data: PhantomData,
105-
}
77+
impl<V: ElemType> DynamicKeyExtractor for KeyExtractor<V> {
78+
fn new_grouped_values(&self) -> DynamicGroupedValues {
79+
DynamicGroupedValues::new::<V>()
10680
}
107-
}
81+
fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {
82+
grouped_values.downcast_mut::<V>().clear()
83+
}
84+
fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {
85+
let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;
86+
let grouped_values = grouped_values.downcast_mut::<V>();
10887

109-
impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
110-
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>> {
111-
let typed_kv = kv.downcast_ref::<KV<String, V>>().unwrap();
112-
KV {
113-
k: typed_kv.k.clone(),
114-
v: Box::new(typed_kv.v.clone()),
88+
if !grouped_values.contains_key(k) {
89+
grouped_values.insert(k.clone(), Vec::new());
11590
}
91+
grouped_values.get_mut(k).unwrap().push(v.clone());
11692
}
117-
fn recombine(
118-
&self,
119-
key: &str,
120-
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
121-
) -> Box<dyn Any + Sync + Send> {
122-
let mut typed_values: Vec<V> = Vec::new();
123-
for untyped_value in values.iter() {
124-
typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone());
93+
94+
fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {
95+
let typed_grouped_values = grouped_values.downcast_ref::<V>();
96+
for (key, values) in typed_grouped_values.iter() {
97+
// TODO: timestamp and pane info are wrong
98+
for receiver in receivers.iter() {
99+
// TODO: End-of-window timestamp, only firing pane.
100+
let mut typed_values: Vec<V> = Vec::new();
101+
for value in values.iter() {
102+
typed_values.push(value.clone());
103+
}
104+
let res = KV {
105+
k: key.to_string(),
106+
v: typed_values,
107+
};
108+
receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(
109+
res,
110+
)));
111+
}
125112
}
126-
Box::new(KV {
127-
k: key.to_string(),
128-
v: typed_values,
129-
})
130113
}
131114
}

sdks/rust/src/transforms/create.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ use crate::{
2323
internals::pvalue::{PTransform, PValue},
2424
};
2525

26-
pub struct Create<T> {
27-
elements: Vec<T>,
26+
pub struct Create<Out> {
27+
elements: Vec<Out>,
2828
}
2929

30-
impl<T: Clone> Create<T> {
31-
pub fn new(elements: &[T]) -> Self {
30+
impl<Out: ElemType> Create<Out> {
31+
pub fn new(elements: &[Out]) -> Self {
3232
Self {
3333
elements: elements.to_vec(),
3434
}
@@ -39,15 +39,15 @@ impl<T: Clone> Create<T> {
3939
// https://github.com/rust-lang/rust/issues/35121
4040
pub type Never = ();
4141

42-
impl<E: ElemType> PTransform<Never, E> for Create<E> {
43-
fn expand(&self, input: &PValue<Never>) -> PValue<E> {
42+
impl<Out: ElemType> PTransform<Never, Out> for Create<Out> {
43+
fn expand(&self, input: &PValue<Never>) -> PValue<Out> {
4444
let elements = self.elements.to_vec();
4545
// TODO: Consider reshuffling.
4646
input
4747
.clone()
4848
.apply(Impulse::new())
49-
.apply(ParDo::from_flatmap_with_context(Box::new(
50-
move |_x| -> Vec<E> { elements.to_vec() },
51-
)))
49+
.apply(ParDo::from_flat_map(move |_x| -> Vec<Out> {
50+
elements.to_vec()
51+
}))
5252
}
5353
}

sdks/rust/src/transforms/group_by_key.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,17 @@ use crate::proto::beam_api::pipeline as proto_pipeline;
3030

3131
pub struct GroupByKey<K, V> {
3232
payload: String,
33-
phantom_k: PhantomData<K>,
34-
phantom_v: PhantomData<V>,
33+
phantom: PhantomData<(K, V)>,
3534
}
3635

36+
pub struct KeyExtractor<V: ElemType>(PhantomData<V>);
37+
3738
// TODO: Use coders to allow arbitrary keys.
3839
impl<V: ElemType> Default for GroupByKey<String, V> {
3940
fn default() -> Self {
4041
Self {
41-
payload: serialize::serialize_fn::<Box<dyn serialize::KeyExtractor>>(Box::new(
42-
Box::new(serialize::TypedKeyExtractor::<V>::default()),
43-
)),
44-
phantom_k: PhantomData,
45-
phantom_v: PhantomData,
42+
payload: serialize::store_key_extractor(KeyExtractor::<V>(PhantomData)),
43+
phantom: PhantomData,
4644
}
4745
}
4846
}

0 commit comments

Comments
 (0)