-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.2.5
What happened
When create many Deferrable operator (eg. TimeDeltaSensorAsync), triggerer component died because of DB Deadlock issue.
[2022-05-11 02:45:08,420] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5397) starting
[2022-05-11 02:45:08,421] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5398) starting
[2022-05-11 02:45:09,459] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5400) starting
[2022-05-11 02:45:09,461] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5399) starting
[2022-05-11 02:45:10,503] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5401) starting
[2022-05-11 02:45:10,504] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5402) starting
[2022-05-11 02:45:11,113] {triggerer_job.py:108} ERROR - Exception when executing TriggererJob._run_trigger_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 106, in _execute
self._run_trigger_loop()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 127, in _run_trigger_loop
Trigger.clean_unused()
File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", line 91, in clean_unused
session.query(TaskInstance).filter(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 4063, in update
update_op.exec_()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
self._execute_stmt(update_stmt)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-05-11 02:45:11,118] {triggerer_job.py:111} INFO - Waiting for triggers to clean up
[2022-05-11 02:45:11,592] {triggerer_job.py:117} INFO - Exited trigger loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/commands/triggerer_command.py", line 56, in triggerer
job.run()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 246, in run
self._execute()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 106, in _execute
self._run_trigger_loop()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 127, in _run_trigger_loop
Trigger.clean_unused()
File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", line 91, in clean_unused
session.query(TaskInstance).filter(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 4063, in update
update_op.exec_()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
self._execute_stmt(update_stmt)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
What you think should happen instead
Triggerer processor does not raise Deadlock error.
How to reproduce
Create "test_timedelta" DAG and run it.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.time_delta import TimeDeltaSensorAsync
default_args = {
"owner": "user",
"start_date": datetime(2021, 2, 8),
"retries": 2,
"retry_delay": timedelta(minutes=20),
"depends_on_past": False,
}
with DAG(
dag_id="test_timedelta",
default_args=default_args,
schedule_interval="10 11 * * *",
max_active_runs=1,
max_active_tasks=2,
catchup=False,
) as dag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
for idx in range(800):
tx = TimeDeltaSensorAsync(
task_id=f"sleep_{idx}",
delta=timedelta(days=3),
)
start >> tx >> endOperating System
uname_result(system='Linux', node='d2845d6331fd', release='5.10.104-linuxkit', version='#1 SMP Thu Mar 17 17:08:06 UTC 2022', machine='x86_64', processor='')
Versions of Apache Airflow Providers
apache-airflow-providers-apache-druid | 2.3.3
apache-airflow-providers-apache-hive | 2.3.2
apache-airflow-providers-apache-spark | 2.1.3
apache-airflow-providers-celery | 2.1.3
apache-airflow-providers-ftp | 2.1.2
apache-airflow-providers-http | 2.1.2
apache-airflow-providers-imap | 2.2.3
apache-airflow-providers-jdbc | 2.1.3
apache-airflow-providers-mysql | 2.2.3
apache-airflow-providers-postgres | 4.1.0
apache-airflow-providers-redis | 2.0.4
apache-airflow-providers-sqlite | 2.1.3
apache-airflow-providers-ssh | 2.4.3
Deployment
Other Docker-based deployment
Deployment details
webserver: 1 instance
scheduler: 1 instance
worker: 1 instance (Celery)
triggerer: 1 instance
redis: 1 instance
Database: 1 instance (mysql)
Anything else
webserver: 172.19.0.9
scheduler: 172.19.0.7
triggerer: 172.19.0.5
worker: 172.19.0.8
MYSQL (SHOW ENGINE INNODB STATUS;)
------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-05-11 07:47:49 139953955817216
*** (1) TRANSACTION:
TRANSACTION 544772, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 20, OS thread handle 139953861383936, query id 228318 172.19.0.5 airflow_user updating
UPDATE task_instance SET trigger_id=NULL WHERE task_instance.state != 'deferred' AND task_instance.trigger_id IS NOT NULL
*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table `airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info bits 0
0: len 6; hex 717565756564; asc queued;;
1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
2: len 9; hex 736c6565705f323436; asc sleep_246;;
3: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 47 n bits 128 index PRIMARY of table `airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap waiting
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info bits 0
0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
1: len 9; hex 736c6565705f323436; asc sleep_246;;
2: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
3: len 6; hex 000000085001; asc P ;;
4: len 7; hex 01000001411e2f; asc A /;;
5: len 7; hex 627b6a250b612d; asc b{j% a-;;
6: SQL NULL;
7: SQL NULL;
8: len 7; hex 72756e6e696e67; asc running;;
9: len 4; hex 80000001; asc ;;
10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
11: len 4; hex 726f6f74; asc root;;
12: len 4; hex 8000245e; asc $^;;
13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
14: len 7; hex 64656661756c74; asc default;;
15: len 4; hex 80000002; asc ;;
16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc TimeDeltaSensorAsync;;
17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
18: SQL NULL;
19: len 4; hex 80000002; asc ;;
20: len 5; hex 80057d942e; asc } .;;
21: len 4; hex 80000001; asc ;;
22: len 4; hex 800021c7; asc ! ;;
23: len 30; hex 36353061663737642d363762372d343166382d383439342d636637333061; asc 650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
24: SQL NULL;
25: SQL NULL;
26: SQL NULL;
27: len 2; hex 0400; asc ;;
*** (2) TRANSACTION:
TRANSACTION 544769, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 4 row lock(s), undo log entries 2
MySQL thread id 12010, OS thread handle 139953323235072, query id 228319 172.19.0.8 airflow_user updating
UPDATE task_instance SET start_date='2022-05-11 07:47:49.745773', state='running', try_number=1, hostname='d2845d6331fd', job_id=9310 WHERE task_instance.task_id = 'sleep_246' AND task_instance.dag_id = 'test_timedelta' AND task_instance.run_id = 'scheduled__2022-05-09T11:10:00+00:00'
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 47 n bits 120 index PRIMARY of table `airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info bits 0
0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
1: len 9; hex 736c6565705f323436; asc sleep_246;;
2: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
3: len 6; hex 000000085001; asc P ;;
4: len 7; hex 01000001411e2f; asc A /;;
5: len 7; hex 627b6a250b612d; asc b{j% a-;;
6: SQL NULL;
7: SQL NULL;
8: len 7; hex 72756e6e696e67; asc running;;
9: len 4; hex 80000001; asc ;;
10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
11: len 4; hex 726f6f74; asc root;;
12: len 4; hex 8000245e; asc $^;;
13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
14: len 7; hex 64656661756c74; asc default;;
15: len 4; hex 80000002; asc ;;
16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc TimeDeltaSensorAsync;;
17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
18: SQL NULL;
19: len 4; hex 80000002; asc ;;
20: len 5; hex 80057d942e; asc } .;;
21: len 4; hex 80000001; asc ;;
22: len 4; hex 800021c7; asc ! ;;
23: len 30; hex 36353061663737642d363762372d343166382d383439342d636637333061; asc 650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
24: SQL NULL;
25: SQL NULL;
26: SQL NULL;
27: len 2; hex 0400; asc ;;
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table `airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap waiting
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info bits 0
0: len 6; hex 717565756564; asc queued;;
1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
2: len 9; hex 736c6565705f323436; asc sleep_246;;
3: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
*** WE ROLL BACK TRANSACTION (1)
Airflow env
AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow_user:airflow_pass@mysql/airflow_db
AIRFLOW__CORE__DEFAULT_TIMEZONE=KST
AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=KST
AIRFLOW_HOME=/home/deploy/airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__WEBSERVER__SECRET_KEY=aoiuwernholo
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow_user:airflow_pass@mysql/airflow_db
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct