-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
AIP-31Task Flow API for nicer DAG definitionTask Flow API for nicer DAG definitionkind:featureFeature RequestsFeature Requests
Description
Description
Add simple way to wrap a function into a PythonFunctionalOperator.
- Should be used without args or with args/kwargs for the underlying operator:
@task
def simple_task(...):
@task(dag=dag)
def simple_task(...):
- Task ID should be the function name by default.
- Should wrap the function so that autocomplete can still suggest parameters correctly.
- Decorator should return an instance of
PythonFunctionalOperator. This can be used to set task dependencies. Ex:
@task
def simple_task(...)
pass
simple_task >> another_task
Example implemetation: https://github.com/casassg/corrent/blob/master/corrent/decorators.py
Ways to use it:
@airflow.task:Lazy imported from main Airflow module (real location airflow.decorators.task). Makes function an operator, but does not automatically assign it to a DAG (unless declared inside a DAG context)@dag.task:As a partial function from DAG class. Task automatically assigned to DAG.
Use case / motivation
- Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator.
Stretch goals
- Modify signature type hints (if any) to include
XComArgfor MyPy to not give issues. Example:
@task
def simple_task(text:str):
pass
inspect.signature(simple_task) -> (text:Union[str, XComArg])
Related Issues
Blocked by: #8056
hemanth-reddy-k
Metadata
Metadata
Assignees
Labels
AIP-31Task Flow API for nicer DAG definitionTask Flow API for nicer DAG definitionkind:featureFeature RequestsFeature Requests