-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Discussed in #40608
Originally posted by plutaniano July 4, 2024
Apache Airflow Provider(s)
common-sql
Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.14.0
apache-airflow-providers-mysql==5.6.1
apache-airflow-providers-postgres==5.11.1
Apache Airflow version
2.9.2
Operating System
MacOS Sonoma 14.5 (docker host)
Deployment
Docker-Compose
Deployment details
I'm using the official Airflow docker-compose.yaml + a MySQL database, details in the reproduction steps.
What happened
The database connection is restarted multiple times during a single DbApiHook.insert_rows call.
What you think should happen instead
DbApiHook.insert_rows should create and maintain a single db connection.
How to reproduce
Creating a temporary test project
mkdir /tmp/airflow/
cd /tmp/airflow/
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'Add the following mysql db to the docker-compose file
mysql:
image: mysql:latest
environment:
MYSQL_DATABASE: 'db'
MYSQL_ROOT_PASSWORD: 'airflow'
ports:
- '3306:3306'Run the docker compose
docker compose up -dAdd the following connections to Airflow using docker exec -it airflow-airflow-triggerer-1 bash
airflow connections add postgres_default --conn-uri postgresql://airflow:airflow@postgres
airflow connections add mysql_default --conn-uri mysql://root:airflow@mysql/dbThen open a python shell and execute the following scripts:
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg = PostgresHook()
pg.run("CREATE TABLE IF NOT EXISTS t (a int)")
pg.insert_rows(
table="t",
rows=[[i] for i in range(10_000)],
target_fields="a",
)And for MySQL
from airflow.providers.mysql.hooks.mysql import MySqlHook
mysql = MySqlHook()
mysql.run("CREATE TABLE IF NOT EXISTS t (a int)")
mysql.insert_rows(
table="t",
rows=[[i] for i in range(100)],
target_fields="a",
)Both scripts will open up multiple connections to database while inserting, instead of maintaining just one. Postgres seems to recreate the connection every 1000 inserts, mysql does it after every insert.
Postgres:
>>> pg.insert_rows(
... table="t",
... rows=[[i] for i in range(10_000)],
... target_fields="a",
... )
[2024-07-04T15:08:13.940+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.942+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.996+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:13.997+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.043+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.044+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.090+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.091+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.145+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.146+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.200+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.201+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.245+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.246+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.290+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.291+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.341+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.342+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.393+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.394+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.441+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.441+0000] {sql.py:611} INFO - Done loading. Loaded a total of 10000 rows into tMySQL
>>> mysql.insert_rows(
... table="t",
... rows=[[i] for i in range(100)],
... target_fields="a",
... )
[2024-07-04T15:08:54.551+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.554+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.555+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.556+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.558+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
...
[2024-07-04T15:08:54.616+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.618+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.619+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.620+0000] {sql.py:611} INFO - Done loading. Loaded a total of 100 rows into tAnything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct