Skip to content

[AIP-31] Create @task decorator for functionally defined operators #8057

@casassg

Description

@casassg

Description

Add simple way to wrap a function into a PythonFunctionalOperator.

  • Should be used without args or with args/kwargs for the underlying operator:
@task
def simple_task(...):

@task(dag=dag)
def simple_task(...):
  • Task ID should be the function name by default.
  • Should wrap the function so that autocomplete can still suggest parameters correctly.
  • Decorator should return an instance of PythonFunctionalOperator. This can be used to set task dependencies. Ex:
@task 
def simple_task(...)
  pass

simple_task >> another_task

Example implemetation: https://github.com/casassg/corrent/blob/master/corrent/decorators.py

Ways to use it:

  • @airflow.task: Lazy imported from main Airflow module (real location airflow.decorators.task). Makes function an operator, but does not automatically assign it to a DAG (unless declared inside a DAG context)
  • @dag.task: As a partial function from DAG class. Task automatically assigned to DAG.

Use case / motivation

  • Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator.

Stretch goals

  • Modify signature type hints (if any) to include XComArg for MyPy to not give issues. Example:
@task 
def simple_task(text:str):
   pass

inspect.signature(simple_task) -> (text:Union[str, XComArg])

Related Issues

Blocked by: #8056

Metadata

Metadata

Assignees

Labels

AIP-31Task Flow API for nicer DAG definitionkind:featureFeature Requests

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions