Skip to content
This repository was archived by the owner on May 22, 2025. It is now read-only.

Commit ba4e6bd

Browse files
authored
Unpin SQLAlchemy & sqlalchemy-bigquery (#367)
This commit unpins `SQLAlchemy` and `sqlalchemy-bigquery` dependency so we can use it with Airflow 2.3. This PR also fixes the Sqlite issues that we were hacking around with quotes. I have also created a companion PR in Airflow: apache/airflow#23790 . Once this PR is merged and release, we can bump the Sqlite Provider and remove the logic of `get_uri` from this repo. closes #351
1 parent 153b886 commit ba4e6bd

File tree

10 files changed

+49
-31
lines changed

10 files changed

+49
-31
lines changed

.github/ci-test-connections.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ connections:
3939
schema: null
4040
- conn_id: sqlite_conn
4141
conn_type: sqlite
42-
host: ////tmp/sqlite.db
42+
host: /tmp/sqlite.db
4343
schema:
4444
login:
4545
password:

noxfile.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ def test(session: nox.Session) -> None:
2525
"""Run unit tests."""
2626
session.install("-e", ".[all]")
2727
session.install("-e", ".[tests]")
28+
# Log all the installed dependencies
29+
session.log("Installed Dependencies:")
30+
session.run("pip3", "freeze")
2831
session.run("airflow", "db", "init")
2932
session.run("pytest", *session.posargs)
3033

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ dependencies = [
2020
"pyarrow",
2121
"python-frontmatter",
2222
"smart-open",
23-
"SQLAlchemy>=1.3.18,<=1.3.24"
23+
"SQLAlchemy>=1.3.18"
2424
]
2525

2626
keywords = ["airflow", "provider", "astronomer", "sql", "decorator", "task flow", "elt", "etl", "dag"]
@@ -49,7 +49,7 @@ tests = [
4949
]
5050
google = [
5151
"apache-airflow-providers-google",
52-
"sqlalchemy-bigquery==1.3.0",
52+
"sqlalchemy-bigquery>=1.3.0",
5353
"smart-open[gcs]>=5.2.1",
5454
]
5555
snowflake = [
@@ -73,7 +73,7 @@ all = [
7373
"smart-open[all]>=5.2.1",
7474
"snowflake-connector-python[pandas]",
7575
"snowflake-sqlalchemy>=1.2.0,<=1.2.4",
76-
"sqlalchemy-bigquery==1.3.0",
76+
"sqlalchemy-bigquery>=1.3.0",
7777
"s3fs"
7878
]
7979
doc = [

src/astro/databases/sqlite.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@ def __init__(self, conn_id: str = DEFAULT_CONN_ID):
1818
super().__init__(conn_id)
1919

2020
@property
21-
def hook(self):
21+
def hook(self) -> SqliteHook:
2222
"""Retrieve Airflow hook to interface with the Sqlite database."""
2323
return SqliteHook(sqlite_conn_id=self.conn_id)
2424

2525
@property
2626
def sqlalchemy_engine(self) -> Engine:
2727
"""Return SQAlchemy engine."""
28-
uri = self.hook.get_uri()
29-
if "////" not in uri:
30-
uri = uri.replace("///", "////")
31-
return create_engine(uri)
28+
# Airflow uses sqlite3 library and not SqlAlchemy for SqliteHook
29+
# and it only uses the hostname directly.
30+
airflow_conn = self.hook.get_connection(self.conn_id)
31+
return create_engine(f"sqlite:///{airflow_conn.host}")
3232

3333
@property
3434
def default_metadata(self) -> Metadata:

src/astro/sql/operators/agnostic_save_file.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from astro.databases import create_database
1010
from astro.files import File
1111
from astro.sql.table import Table
12+
from astro.sqlite_utils import create_sqlalchemy_engine_with_sqlite
1213
from astro.utils.database import create_database_from_conn_id
1314
from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook
1415
from astro.utils.task_id_helper import get_task_id
@@ -111,9 +112,15 @@ def convert_sql_table_to_dataframe(
111112

112113
db = create_database(input_table.conn_id)
113114
table_name = db.get_table_qualified_name(input_table)
115+
116+
if database == Database.SQLITE:
117+
con_engine = create_sqlalchemy_engine_with_sqlite(input_hook)
118+
else:
119+
con_engine = input_hook.get_sqlalchemy_engine()
120+
114121
return pd.read_sql(
115122
f"SELECT * FROM {table_name}",
116-
con=input_hook.get_sqlalchemy_engine(),
123+
con=con_engine,
117124
)
118125

119126

src/astro/sql/operators/sql_dataframe.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from astro.databases import create_database
1010
from astro.settings import SCHEMA
1111
from astro.sql.table import Table
12+
from astro.sqlite_utils import create_sqlalchemy_engine_with_sqlite
1213
from astro.utils import get_hook
1314
from astro.utils.database import create_database_from_conn_id
1415
from astro.utils.dependencies import (
@@ -147,7 +148,7 @@ def _get_dataframe(self, table: Table):
147148
)
148149
elif database == Database.SQLITE:
149150
hook = SqliteHook(sqlite_conn_id=table.conn_id)
150-
engine = hook.get_sqlalchemy_engine()
151+
engine = create_sqlalchemy_engine_with_sqlite(hook)
151152
df = pd.read_sql_table(table.name, engine)
152153
elif database == Database.BIGQUERY:
153154
db = create_database(table.conn_id)

src/astro/sqlite_utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import sqlalchemy
2+
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
3+
4+
5+
# TODO: This function should be removed after the refactor as this is handled in the Database
6+
def create_sqlalchemy_engine_with_sqlite(hook: SqliteHook) -> sqlalchemy.engine.Engine:
7+
# Airflow uses sqlite3 library and not SqlAlchemy for SqliteHook
8+
# and it only uses the hostname directly.
9+
airflow_conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
10+
engine = sqlalchemy.create_engine(f"sqlite:///{airflow_conn.host}")
11+
return engine

src/astro/utils/database.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from airflow.hooks.base import BaseHook
44
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
5-
from sqlalchemy import create_engine, text
6-
from sqlalchemy.engine import Engine
7-
from sqlalchemy.engine.result import ResultProxy
5+
from sqlalchemy import text
6+
from sqlalchemy.engine import Engine, ResultProxy
87

98
from astro.constants import CONN_TYPE_TO_DATABASE, Database
9+
from astro.sqlite_utils import create_sqlalchemy_engine_with_sqlite
1010
from astro.utils.dependencies import BigQueryHook, PostgresHook, SnowflakeHook
1111

1212

@@ -64,10 +64,7 @@ def get_sqlalchemy_engine(hook: Union[BaseHook, SqliteHook]) -> Engine:
6464
database = get_database_name(hook)
6565
engine = None
6666
if database == Database.SQLITE:
67-
uri = hook.get_uri()
68-
if "////" not in uri:
69-
uri = hook.get_uri().replace("///", "////")
70-
engine = create_engine(uri)
67+
engine = create_sqlalchemy_engine_with_sqlite(hook)
7168
if engine is None:
7269
engine = hook.get_sqlalchemy_engine()
7370
return engine
@@ -88,7 +85,7 @@ def run_sql(
8885
:param parameters: (optional) Parameters to be passed to the SQL statement
8986
:type parameters: dict
9087
:return: Result of running the statement
91-
:rtype: sqlalchemy.engine.result.ResultProxy
88+
:rtype: sqlalchemy.engine.ResultProxy
9289
"""
9390
if parameters is None:
9491
parameters = {}

tests/databases/test_sqlite.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
"""Tests specific to the Sqlite Database implementation."""
22
import os
33
import pathlib
4-
from urllib.parse import urlparse
54

65
import pandas as pd
76
import pytest
87
import sqlalchemy
8+
from airflow.hooks.base import BaseHook
99

1010
from astro.constants import Database
1111
from astro.databases import create_database
@@ -33,20 +33,19 @@ def test_create_database(conn_id):
3333

3434

3535
@pytest.mark.parametrize(
36-
"conn_id,expected_uri",
36+
"conn_id,expected_db_path",
3737
[
38-
(DEFAULT_CONN_ID, "//tmp/sqlite_default.db"),
39-
(CUSTOM_CONN_ID, "////tmp/sqlite.db"),
38+
(DEFAULT_CONN_ID, BaseHook.get_connection(DEFAULT_CONN_ID).host),
39+
(CUSTOM_CONN_ID, "/tmp/sqlite.db"),
4040
],
4141
ids=SUPPORTED_CONN_IDS,
4242
)
43-
def test_sqlite_sqlalchemy_engine(conn_id, expected_uri):
44-
"""Confirm that the SQLALchemy is created successfully."""
43+
def test_sqlite_sqlalchemy_engine(conn_id, expected_db_path):
44+
"""Confirm that the SQLAlchemy is created successfully and verify DB path."""
4545
database = SqliteDatabase(conn_id)
4646
engine = database.sqlalchemy_engine
4747
assert isinstance(engine, sqlalchemy.engine.base.Engine)
48-
url = urlparse(str(engine.url))
49-
assert url.path == expected_uri
48+
assert engine.url.database == expected_db_path
5049

5150

5251
@pytest.mark.integration

tests/utils/test_database.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlalchemy.engine.base import Engine
88

99
from astro.constants import Database
10+
from astro.sqlite_utils import create_sqlalchemy_engine_with_sqlite
1011
from astro.utils.database import (
1112
create_database_from_conn_id,
1213
get_database_name,
@@ -59,7 +60,7 @@ def with_sqlite_hook():
5960
hook = SqliteHook()
6061
db = get_database_name(hook)
6162
assert db == Database.SQLITE
62-
engine = hook.get_sqlalchemy_engine()
63+
engine = create_sqlalchemy_engine_with_sqlite(hook)
6364
db = get_database_name(engine)
6465
assert db == Database.SQLITE
6566

@@ -74,10 +75,9 @@ def with_unsupported_hook():
7475
def describe_get_sqlalchemy_engine():
7576
def with_sqlite():
7677
hook = SqliteHook(sqlite_conn_id="sqlite_conn")
77-
engine = get_sqlalchemy_engine(hook)
78+
engine = create_sqlalchemy_engine_with_sqlite(hook)
7879
assert isinstance(engine, Engine)
79-
url = urlparse(str(engine.url))
80-
assert url.path == "////tmp/sqlite.db"
80+
assert engine.url.database == BaseHook.get_connection("sqlite_conn").host
8181

8282
def with_sqlite_default_conn():
8383
hook = SqliteHook()

0 commit comments

Comments
 (0)