Skip to content

Commit 3f3a4b5

Browse files
committed
Fix race in Celery tests by pre-creating result tables (apache#8909)
We noticed our Celery tests failing sometimes with > (psycopg2.errors.UniqueViolation) duplicate key value violates unique > constraint "pg_type_typname_nsp_index" > DETAIL: Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists It appears this is a race condition in SQLAlchemy's "create_all()" function, where it first checks which tables exist, builds up a list of `CREATE TABLE` statements, then issues them. Thus if two celery worker processes start at the same time, they will find the the table doesn't yet exist, and both try to create it. This is _probably_ a bug in SQLA, but this should be an easy enough fix here, to just ensure that the table exists before launching any Celery tasks. (cherry picked from commit bae5cc2)
1 parent 229f631 commit 3f3a4b5

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

tests/executors/test_celery_executor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ def _prepare_app(self, broker_url=None, execute=None):
6161
patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
6262
patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
6363

64+
backend = test_app.backend
65+
66+
if hasattr(backend, 'ResultSession'):
67+
# Pre-create the database tables now, otherwise SQLA vis Celery has a
68+
# race condition where it one of the subprocesses can die with "Table
69+
# already exists" error, because SQLA checks for which tables exist,
70+
# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
71+
# EXISTS
72+
session = backend.ResultSession()
73+
session.close()
74+
6475
with patch_app, patch_execute:
6576
try:
6677
yield test_app
@@ -140,6 +151,7 @@ def fake_execute_command():
140151
self.assertEquals(1, len(executor.queued_tasks))
141152
self.assertEquals(executor.queued_tasks['key'], value_tuple)
142153

154+
@pytest.mark.backend("mysql", "postgres")
143155
def test_exception_propagation(self):
144156
with self._prepare_app() as app:
145157
@app.task

0 commit comments

Comments
 (0)