Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ repos:
^airflow/sensors/.*$|
^airflow/providers/.*$|
^airflow/contrib/.*$
- id: provide-create-sessions
language: pygrep
name: Make sure provide_session and create_session are imported from airflow.utils.session
entry: "from airflow\\.utils\\.db import.* (provide_session|create_session)"
files: \.py$
pass_filenames: true
- id: base-operator
language: pygrep
name: Make sure BaseOperator is imported from airflow.models outside of core
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session


@provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow.models.baseoperator import BaseOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session
from airflow.utils.state import State


Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Pool APIs."""
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models import Pool
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session


@provide_session
Expand Down
9 changes: 5 additions & 4 deletions airflow/cli/commands/connection_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
from tabulate import tabulate

from airflow.models import Connection
from airflow.utils import cli as cli_utils, db
from airflow.utils import cli as cli_utils
from airflow.utils.cli import alternative_conn_specs
from airflow.utils.session import create_session


def connections_list(args):
"""Lists all connections at the command line"""
with db.create_session() as session:
with create_session() as session:
conns = session.query(Connection.conn_id, Connection.conn_type,
Connection.host, Connection.port,
Connection.is_encrypted,
Expand Down Expand Up @@ -76,7 +77,7 @@ def connections_add(args):
if args.conn_extra is not None:
new_conn.set_extra(args.conn_extra)

with db.create_session() as session:
with create_session() as session:
if not (session.query(Connection)
.filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
Expand All @@ -100,7 +101,7 @@ def connections_add(args):
@cli_utils.action_logging
def connections_delete(args):
"""Deletes connection from DB"""
with db.create_session() as session:
with create_session() as session:
try:
to_delete = (session
.query(Connection)
Expand Down
5 changes: 3 additions & 2 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
from airflow import DAG, AirflowException, conf, jobs, settings
from airflow.api.client import get_current_api_client
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.utils import cli as cli_utils, db
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag, process_subdir, sigint_handler
from airflow.utils.dot_renderer import render_dag
from airflow.utils.session import create_session


def _tabulate_dag_runs(dag_runs: List[DagRun], tablefmt="fancy_grid"):
Expand Down Expand Up @@ -266,7 +267,7 @@ def dag_list_jobs(args, dag=None):
if args.state:
queries.append(jobs.BaseJob.state == args.state)

with db.create_session() as session:
with create_session() as session:
all_jobs = (session
.query(jobs.BaseJob)
.filter(*queries)
Expand Down
5 changes: 3 additions & 2 deletions airflow/cli/commands/rotate_fernet_key_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
# under the License.
"""Rotate Fernet key command"""
from airflow.models import Connection, Variable
from airflow.utils import cli as cli_utils, db
from airflow.utils import cli as cli_utils
from airflow.utils.session import create_session


@cli_utils.action_logging
def rotate_fernet_key(args):
"""Rotates all encrypted connection credentials and variables"""
with db.create_session() as session:
with create_session() as session:
for conn in session.query(Connection).filter(
Connection.is_encrypted | Connection.is_extra_encrypted):
conn.rotate_fernet_key()
Expand Down
5 changes: 3 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import DagPickle, TaskInstance
from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import cli as cli_utils, db
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag, get_dag_by_pickle, get_dags
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session


def _run_task_by_selected_method(args, dag, ti):
Expand Down Expand Up @@ -59,7 +60,7 @@ def _run_task_by_executor(args, dag, ti):
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
with db.create_session() as session:
with create_session() as session:
pickle = DagPickle(dag)
session.add(pickle)
pickle_id = pickle.id
Expand Down
7 changes: 4 additions & 3 deletions airflow/cli/commands/variable_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import os

from airflow.models import Variable
from airflow.utils import cli as cli_utils, db
from airflow.utils import cli as cli_utils
from airflow.utils.session import create_session


def variables_list(args):
"""Displays all of the variables"""
with db.create_session() as session:
with create_session() as session:
variables = session.query(Variable)
print("\n".join(var.key for var in variables))

Expand Down Expand Up @@ -95,7 +96,7 @@ def _import_helper(filepath):
def _variable_export_helper(filepath):
"""Helps export all of the variables to the file"""
var_dict = {}
with db.create_session() as session:
with create_session() as session:
qry = session.query(Variable).all()

data = json.JSONDecoder()
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session

log = LoggingMixin().log

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/google_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

from airflow import models
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session

log = LoggingMixin().log

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.security import utils
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session

# pylint: disable=c-extension-no-member
LOGIN_MANAGER = flask_login.LoginManager()
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session

login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/auth/backends/password_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from wtforms.validators import InputRequired

from airflow import models
from airflow.utils.db import create_session, provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session

LOGIN_MANAGER = flask_login.LoginManager()
LOGIN_MANAGER.login_view = 'airflow.login' # Calls login() below
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
from airflow.models.taskinstance import TaskInstanceKeyType
from airflow.utils.db import create_session, provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State

MAX_POD_ID_LEN = 253
Expand Down
2 changes: 1 addition & 1 deletion airflow/gcp/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from airflow.hooks.mysql_hook import MySqlHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Connection
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session

UNIX_PATH_MAX = 108

Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session

CONN_ENV_PREFIX = 'AIRFLOW_CONN_'

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.ti_deps.dep_context import BACKFILL_QUEUED_DEPS, DepContext
from airflow.utils import timezone
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session
from airflow.utils.state import State


Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.utils import helpers, timezone
from airflow.utils.db import create_session, provide_session
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.state import State

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from airflow.stats import Stats
from airflow.task.task_runner import get_task_runner
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.net import get_hostname
from airflow.utils.session import provide_session
from airflow.utils.state import State


Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
from airflow.utils.dag_processing import (
AbstractDagFileProcessorProcess, DagFileProcessorAgent, SimpleDag, SimpleDagBag,
)
from airflow.utils.db import provide_session
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import list_py_file_paths
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.session import provide_session
from airflow.utils.state import State


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from sqlalchemy import Column, Float, Integer, PickleType, String
from sqlalchemy.ext.declarative import declarative_base

from airflow.utils.db import create_session
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.operator_resources import Resources
from airflow.utils.session import provide_session
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, STORE_SERIALIZED_DAGS
from airflow.utils import timezone
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.db import provide_session
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import Interval, UtcDateTime
from airflow.utils.state import State

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
from airflow.exceptions import AirflowDagCycleException
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import pprinttable
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.timeout import timeout


Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.state import State

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sqlalchemy.orm import Session

from airflow.models.base import Base
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session


class KubeResourceVersion(Base):
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow.models.base import Base
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils.db import provide_session
from airflow.utils.session import provide_session
from airflow.utils.state import State


Expand Down
Loading