Skip to content

Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa #22969

@toransahu

Description

@toransahu

What happened?

Apache Beam SDK Version: 2.40.0
SDK Language: Python
Runner: All (DirectRunner, DataflowRunner, PortableRunner etc.)

TL;DR: If a DoFn.process has yield <some value> as well as return <some iterable> statements, then it does not emit element for statement return <some iterable>.

The Apache Beam, Programming Guide, 4.2.1.2. Creating a DoFn states that:

Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.

That statement is correct when a DoFn.process:

  1. either uses yield <some value> only
  2. or, uses return <some iterable> only

to emit an element, within the definition.

If the combination of yield and return are used in the DoFn.process() definition, then it does not comply with the statement made in the document.

See this example pipeline:

# dofn_issue.py

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


logger = logging.getLogger(__name__)
logger.setLevel("INFO")


class Pipeline:
    def run(self, args=None):
        parser = argparse.ArgumentParser()
        _, extra_args = parser.parse_known_args(args)
        pipeline_options = PipelineOptions(extra_args, save_main_session=True)
        with beam.Pipeline(options=pipeline_options) as pipeline:
            data_to_process = pipeline | beam.Create(
                [
                    {"key": 1},
                    {"key": 2},
                    {"key": None},
                    {"key": 4},
                ],
            )
            data_to_process | beam.ParDo(SetDefaultValFn1()) | "1" >> beam.ParDo(LogElementsFn(), 1)
            data_to_process | beam.ParDo(SetDefaultValFn2()) | "2" >> beam.ParDo(LogElementsFn(), 2)
            data_to_process | beam.ParDo(SetDefaultValFn3()) | "3" >> beam.ParDo(LogElementsFn(), 3)
            data_to_process | beam.ParDo(SetDefaultValFn4()) | "4" >> beam.ParDo(LogElementsFn(), 4)


# NOT EXPECTED - `return` statement doesn't emit data
class SetDefaultValFn1(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        yield element


# NOT EXPECTED - `yield` statement only emits data
class SetDefaultValFn2(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        return [element]


# EXPECTED
class SetDefaultValFn3(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        return [element]


# EXPECTED
class SetDefaultValFn4(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        yield element


class LogElementsFn(beam.DoFn):
    def process(self, element, where):
        logger.info(f"From {where} found {element}")
        yield element


if __name__ == "__main__":
    pipeline = Pipeline()
    pipeline.run()

Actual Output:

$ python dofn_issue.py \
--runner DirectRunner

INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
# NOTE: SetDefaultValFn1 silently skipped element {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 1}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

PS: Output is manually ordered for ease of interpretation.

Expected Output:

$ python dofn_issue.py \
--runner DirectRunner

INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': 1}
INFO:__main__:From 2 found {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 2 found {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

PS: Output is manually ordered for ease of interpretation.

If the analogy of DoFn.process is similar to a Python generator, then on running a similar code (made purely with generator):

# generator_eg.py

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class SetDefaultVal1:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        yield element


class SetDefaultVal2:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        return [element]


class SetDefaultVal3:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        return [element]


class SetDefaultVal4:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        yield element


def run_and_log(Generator, element, where):
    generator = Generator.process(element)
    try:
        processed_element = next(generator)
        logger.info(f"From {where} found {processed_element}")
    except Exception:
        pass


if __name__ == "__main__":
    for element in [
        {"key": 1},
        {"key": 2},
        {"key": None},
        {"key": 4},
    ]:
        run_and_log(SetDefaultVal1, element, 1)
        run_and_log(SetDefaultVal2, element, 2)
        run_and_log(SetDefaultVal3, element, 3)
        run_and_log(SetDefaultVal4, element, 4)

Outputs:

$ python generator_eg.py
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

# PS: Output is manually ordered for ease of interpretation.

If we compare behavior of DoFn.process with Python generator, for all 4 cases:

Case 4.

  1. Output is expected for dofn_issue.py & generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Case 3.

  1. Output is expected for dofn_issue.py & generator_eg.py.
  2. Output is NOT same for dofn_issue.py & generator_eg.py.

Case 2.

  1. a) Output is NOT expected for dofn_issue.py. b) Output is expected for generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Case 1.

  1. a) Output is NOT expected for dofn_issue.py. b) Output is expected for generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Discrepancy

If dofn_issue.py for case 3 emits on statement return <some iterable> (which is NOT the case with generator_eg.py), then why it does NOT emit for case 1 and case2's statement return <some iterable>.

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

Metadata

Metadata

Assignees

Labels

P1bugcoredone & doneIssue has been reviewed after it was closed for verification, followups, etc.python

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions