-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
Apache Beam SDK Version: 2.40.0
SDK Language: Python
Runner: All (DirectRunner, DataflowRunner, PortableRunner etc.)
TL;DR: If a DoFn.process has yield <some value> as well as return <some iterable> statements, then it does not emit element for statement return <some iterable>.
The Apache Beam, Programming Guide, 4.2.1.2. Creating a DoFn states that:
Your
processmethod should accept an argumentelement, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements withyieldstatements. You can also use areturnstatement with an iterable, like a list or a generator.
That statement is correct when a DoFn.process:
- either uses
yield <some value>only - or, uses
return <some iterable>only
to emit an element, within the definition.
If the combination of yield and return are used in the DoFn.process() definition, then it does not comply with the statement made in the document.
See this example pipeline:
# dofn_issue.py
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
class Pipeline:
def run(self, args=None):
parser = argparse.ArgumentParser()
_, extra_args = parser.parse_known_args(args)
pipeline_options = PipelineOptions(extra_args, save_main_session=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
data_to_process = pipeline | beam.Create(
[
{"key": 1},
{"key": 2},
{"key": None},
{"key": 4},
],
)
data_to_process | beam.ParDo(SetDefaultValFn1()) | "1" >> beam.ParDo(LogElementsFn(), 1)
data_to_process | beam.ParDo(SetDefaultValFn2()) | "2" >> beam.ParDo(LogElementsFn(), 2)
data_to_process | beam.ParDo(SetDefaultValFn3()) | "3" >> beam.ParDo(LogElementsFn(), 3)
data_to_process | beam.ParDo(SetDefaultValFn4()) | "4" >> beam.ParDo(LogElementsFn(), 4)
# NOT EXPECTED - `return` statement doesn't emit data
class SetDefaultValFn1(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
return [{"key": -9999}]
yield element
# NOT EXPECTED - `yield` statement only emits data
class SetDefaultValFn2(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
return [element]
# EXPECTED
class SetDefaultValFn3(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
return [{"key": -9999}]
return [element]
# EXPECTED
class SetDefaultValFn4(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
yield element
class LogElementsFn(beam.DoFn):
def process(self, element, where):
logger.info(f"From {where} found {element}")
yield element
if __name__ == "__main__":
pipeline = Pipeline()
pipeline.run()Actual Output:
$ python dofn_issue.py \
--runner DirectRunner
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
# NOTE: SetDefaultValFn1 silently skipped element {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 1}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
PS: Output is manually ordered for ease of interpretation.Expected Output:
$ python dofn_issue.py \
--runner DirectRunner
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': 1}
INFO:__main__:From 2 found {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 2 found {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
PS: Output is manually ordered for ease of interpretation.If the analogy of DoFn.process is similar to a Python generator, then on running a similar code (made purely with generator):
# generator_eg.py
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class SetDefaultVal1:
@staticmethod
def process(element):
data = element.get("key")
if not data:
return [{"key": -9999}]
yield element
class SetDefaultVal2:
@staticmethod
def process(element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
return [element]
class SetDefaultVal3:
@staticmethod
def process(element):
data = element.get("key")
if not data:
return [{"key": -9999}]
return [element]
class SetDefaultVal4:
@staticmethod
def process(element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
yield element
def run_and_log(Generator, element, where):
generator = Generator.process(element)
try:
processed_element = next(generator)
logger.info(f"From {where} found {processed_element}")
except Exception:
pass
if __name__ == "__main__":
for element in [
{"key": 1},
{"key": 2},
{"key": None},
{"key": 4},
]:
run_and_log(SetDefaultVal1, element, 1)
run_and_log(SetDefaultVal2, element, 2)
run_and_log(SetDefaultVal3, element, 3)
run_and_log(SetDefaultVal4, element, 4)Outputs:
$ python generator_eg.py
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
# PS: Output is manually ordered for ease of interpretation.If we compare behavior of DoFn.process with Python generator, for all 4 cases:
Case 4.
- Output is expected for
dofn_issue.py&generator_eg.py. - Output is same for
dofn_issue.py&generator_eg.py.
Case 3.
- Output is expected for
dofn_issue.py&generator_eg.py. - Output is NOT same for
dofn_issue.py&generator_eg.py.
Case 2.
- a) Output is NOT expected for
dofn_issue.py. b) Output is expected forgenerator_eg.py. - Output is same for
dofn_issue.py&generator_eg.py.
Case 1.
- a) Output is NOT expected for
dofn_issue.py. b) Output is expected forgenerator_eg.py. - Output is same for
dofn_issue.py&generator_eg.py.
Discrepancy
If dofn_issue.py for case 3 emits on statement return <some iterable> (which is NOT the case with generator_eg.py), then why it does NOT emit for case 1 and case2's statement return <some iterable>.
Issue Priority
Priority: 1
Issue Component
Component: sdk-py-core