Airflow sensors
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
Sensors
What is a sensor?
An operator that waits for a certain condition to be true
Creation of a le
Upload of a database record
Certain response from a web request
Can de ne how o en to check for the condition to be true
Are assigned to tasks
INTRODUCTION TO AIRFLOW IN PYTHON
Sensor details
Derived from airflow.sensors.base_sensor_operator
Sensor arguments:
mode - How to check for the condition
mode='poke' - The default, run repeatedly
mode='reschedule' - Give up task slot and try again later
poke_interval - How o en to wait between checks
timeout - How long to wait before failing task
Also includes normal operator a ributes
INTRODUCTION TO AIRFLOW IN PYTHON
File sensor
Is part of the airflow.contrib.sensors library
Checks for the existence of a le at a certain location
Can also check if any les exist within a directory
from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor(task_id='file_sense',
filepath='salesdata.csv',
poke_interval=300,
dag=sales_report_dag)
init_sales_cleanup >> file_sensor_task >> generate_report
INTRODUCTION TO AIRFLOW IN PYTHON
Other sensors
ExternalTaskSensor - wait for a task in another DAG to complete
HttpSensor - Request a web URL and check for content
SqlSensor - Runs a SQL query to check for content
Many others in airflow.sensors and airflow.contrib.sensors
INTRODUCTION TO AIRFLOW IN PYTHON
Why sensors?
Use a sensor...
Uncertain when it will be true
If failure not immediately desired
To add task repetition without loops
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
Airflow executors
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
What is an executor?
Executors run tasks
Di erent executors handle running the tasks di erently
Example executors:
SequentialExecutor
LocalExecutor
CeleryExecutor
INTRODUCTION TO AIRFLOW IN PYTHON
SequentialExecutor
The default Air ow executor
Runs one task at a time
Useful for debugging
While functional, not really recommended for production
INTRODUCTION TO AIRFLOW IN PYTHON
LocalExecutor
Runs on a single system
Treats tasks as processes
Parallelism de ned by the user
Can utilize all resources of a given host system
INTRODUCTION TO AIRFLOW IN PYTHON
CeleryExecutor
Uses a Celery backend as task manager
Multiple worker systems can be de ned
Is signi cantly more di cult to setup & con gure
Extremely powerful method for organizations with extensive work ows
INTRODUCTION TO AIRFLOW IN PYTHON
Determine your executor
Via the airflow.cfg le
Look for the executor= line
INTRODUCTION TO AIRFLOW IN PYTHON
Determine your executor #2
Via the rst line of airflow list_dags
INFO - Using SequentialExecutor
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
Debugging and
troubleshooting in
Airflow
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
Typical issues...
DAG won't run on schedule
DAG won't load
Syntax errors
INTRODUCTION TO AIRFLOW IN PYTHON
DAG won't run on schedule
Check if scheduler is running
Fix by running airflow scheduler from the command-line.
INTRODUCTION TO AIRFLOW IN PYTHON
DAG won't run on schedule
At least one schedule_interval hasn't passed.
Modify the a ributes to meet your requirements.
Not enough tasks free within the executor to run.
Change executor type
Add system resources
Add more systems
Change DAG scheduling
INTRODUCTION TO AIRFLOW IN PYTHON
DAG won't load
DAG not in web UI
DAG not in airflow list_dags
Possible solutions
Verify DAG le is in correct folder
Determine the DAGs folder via airflow.cfg
Note, the folder must be an absolute path
INTRODUCTION TO AIRFLOW IN PYTHON
Syntax errors
The most common reason a DAG le won't appear
Sometimes di cult to nd errors in DAG
Two quick methods:
Run airflow list_dags
Run python3 <dagfile.py>
INTRODUCTION TO AIRFLOW IN PYTHON
airflow list_dags
INTRODUCTION TO AIRFLOW IN PYTHON
Running the Python interpreter
python3 dagfile.py :
With errors
Without errors
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
SLAs and reporting
in Airflow
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
SLAs
What is an SLA?
An SLA stands for Service Level Agreement
Within Air ow, the amount of time a task or a DAG should require to run
An SLA Miss is any time the task / DAG does not meet the expected timing
If an SLA is missed, an email is sent out and a log is stored.
You can view SLA misses in the web UI.
INTRODUCTION TO AIRFLOW IN PYTHON
SLA Misses
Found under Browse: SLA Misses
INTRODUCTION TO AIRFLOW IN PYTHON
Defining SLAs
Using the 'sla' argument on the task
task1 = BashOperator(task_id='sla_task',
bash_command='runcode.sh',
sla=timedelta(seconds=30),
dag=dag)
On the default_args dictionary
default_args={
'sla': timedelta(minutes=20)
'start_date': datetime(2020,2,20)
}
dag = DAG('sla_dag', default_args=default_args)
INTRODUCTION TO AIRFLOW IN PYTHON
timedelta object
In the datetime library
Accessed via from datetime import timedelta
Takes arguments of days, seconds, minutes, hours, and weeks
timedelta(seconds=30)
timedelta(weeks=2)
timedelta(days=4, hours=10, minutes=20, seconds=30)
INTRODUCTION TO AIRFLOW IN PYTHON
General reporting
Options for success / failure / error
Keys in the default_args dictionary
default_args={
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': True,
...
}
Within DAGs from the EmailOperator
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON