Skip to content

DbApiHook.insert_rows unnecessarily restarting connections #40609

@potiuk

Description

@potiuk

Discussed in #40608

Originally posted by plutaniano July 4, 2024

Apache Airflow Provider(s)

common-sql

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.14.0
apache-airflow-providers-mysql==5.6.1
apache-airflow-providers-postgres==5.11.1

Apache Airflow version

2.9.2

Operating System

MacOS Sonoma 14.5 (docker host)

Deployment

Docker-Compose

Deployment details

I'm using the official Airflow docker-compose.yaml + a MySQL database, details in the reproduction steps.

What happened

The database connection is restarted multiple times during a single DbApiHook.insert_rows call.

What you think should happen instead

DbApiHook.insert_rows should create and maintain a single db connection.

How to reproduce

Creating a temporary test project

mkdir /tmp/airflow/
cd /tmp/airflow/
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'

Add the following mysql db to the docker-compose file

  mysql:
    image: mysql:latest
    environment:
      MYSQL_DATABASE: 'db'
      MYSQL_ROOT_PASSWORD: 'airflow'
    ports:
      - '3306:3306'

Run the docker compose

docker compose up -d

Add the following connections to Airflow using docker exec -it airflow-airflow-triggerer-1 bash

airflow connections add postgres_default --conn-uri postgresql://airflow:airflow@postgres
airflow connections add mysql_default --conn-uri mysql://root:airflow@mysql/db

Then open a python shell and execute the following scripts:

from airflow.providers.postgres.hooks.postgres import PostgresHook

pg = PostgresHook()
pg.run("CREATE TABLE IF NOT EXISTS t (a int)")

pg.insert_rows(
    table="t",
    rows=[[i] for i in range(10_000)],
    target_fields="a",
)

And for MySQL

from airflow.providers.mysql.hooks.mysql import MySqlHook

mysql = MySqlHook()
mysql.run("CREATE TABLE IF NOT EXISTS t (a int)")
mysql.insert_rows(
    table="t",
    rows=[[i] for i in range(100)],
    target_fields="a",
)

Both scripts will open up multiple connections to database while inserting, instead of maintaining just one. Postgres seems to recreate the connection every 1000 inserts, mysql does it after every insert.

Postgres:

>>> pg.insert_rows(
...     table="t",
...     rows=[[i] for i in range(10_000)],
...     target_fields="a",
... )
[2024-07-04T15:08:13.940+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.942+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.996+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:13.997+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.043+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.044+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.090+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.091+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.145+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.146+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.200+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.201+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.245+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.246+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.290+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.291+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.341+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.342+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.393+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.394+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.441+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.441+0000] {sql.py:611} INFO - Done loading. Loaded a total of 10000 rows into t

MySQL

>>> mysql.insert_rows(
...     table="t",
...     rows=[[i] for i in range(100)],
...     target_fields="a",
... )
[2024-07-04T15:08:54.551+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.554+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.555+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.556+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.558+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
...
[2024-07-04T15:08:54.616+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.618+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.619+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.620+0000] {sql.py:611} INFO - Done loading. Loaded a total of 100 rows into t

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions