Skip to content

Commit 45ea758

Browse files
authored
[yaml] yaml_transform.py unit tests python - PR 3/3 (#27356)
* SafeLineLoader unit tests * LightweightScope unit tests * Scope Unit Tests * yaml_transform.py - chain_as_composite() unit tests * yaml_transform.py - normalize_source_sink() unit tests * yaml_transform.py - preprocess_source_sink() unit tests * yaml_transform.py - normalize_inputs_outputs() unit tests * yaml_transform.py - identify_object() and extract_name() unit tests * yaml_transform.py - push_windowing_to_roots() unit tests * yaml_transform.py - preprocess_windowing() unit tests * yaml_transform.py - preprocess_flattened_inputs() unit tests * yaml_transform.py - ensure_transforms_have_types() unit tests * yaml_transform.py - expand_pipeline() unit tests * yaml_transform.py - YamlTransform() unit tests
1 parent 505f942 commit 45ea758

File tree

2 files changed

+845
-23
lines changed

2 files changed

+845
-23
lines changed

sdks/python/apache_beam/yaml/yaml_transform.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ def expand_chain_transform(spec, scope):
397397

398398

399399
def chain_as_composite(spec):
400+
def is_not_output_of_last_transform(new_transforms, value):
401+
return (
402+
('name' in new_transforms[-1] and
403+
value != new_transforms[-1]['name']) or
404+
('type' in new_transforms[-1] and value != new_transforms[-1]['type']))
405+
400406
# A chain is simply a composite transform where all inputs and outputs
401407
# are implicit.
402408
spec = normalize_source_sink(spec)
@@ -420,6 +426,12 @@ def chain_as_composite(spec):
420426

421427
last_transform = new_transforms[-1]['__uuid__']
422428
if has_explicit_outputs:
429+
for (key, value) in composite_spec['output'].items():
430+
if is_not_output_of_last_transform(new_transforms, value):
431+
raise ValueError(
432+
f"Explicit output {identify_object(value)} of the chain transform"
433+
f" is not an output of the last transform.")
434+
423435
composite_spec['output'] = {
424436
key: f'{last_transform}.{value}'
425437
for (key, value) in composite_spec['output'].items()
@@ -547,7 +559,7 @@ def preprocess_windowing(spec):
547559

548560
windowing = spec.pop('windowing')
549561
if spec['input']:
550-
# Apply the windowing to all inputs by wrapping it in a trasnform that
562+
# Apply the windowing to all inputs by wrapping it in a transform that
551563
# first applies windowing and then applies the original transform.
552564
original_inputs = spec['input']
553565
windowing_transforms = [{

0 commit comments

Comments
 (0)