Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ env:
IMAGE_TAG: "${{ github.event.pull_request.head.sha || github.sha }}"
USE_SUDO: "true"
INCLUDE_SUCCESS_OUTPUTS: "true"
AIRFLOW_ENABLE_AIP_44: "true"

concurrency:
group: ci-${{ github.event.pull_request.number || github.ref }}
Expand Down Expand Up @@ -937,7 +938,54 @@ jobs:
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
UPGRADE_BOTO: "true"
JOB_ID: >
postgres-${{needs.build-info.outputs.default-python-version}}-
postgres-boto-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
if: needs.build-info.outputs.run-tests == 'true' && needs.build-info.outputs.run-amazon-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v3
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types}}
run: breeze testing tests --run-in-parallel
- name: >
Post Tests: ${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types}}
uses: ./.github/actions/post_tests

tests-postgres-in-progress-features-disabled:
timeout-minutes: 130
name: >
InProgressDisabledPostgres${{needs.build-info.outputs.default-postgres-version}},
Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types}}
runs-on: "${{needs.build-info.outputs.runs-on}}"
needs: [build-info, test-pytest-collection]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types}}"
SUSPENDED_PROVIDERS_FOLDERS: "${{ needs.build-info.outputs.suspended-providers-folders }}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "postgres"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
AIRFLOW_ENABLE_AIP_44: "false"
AIRFLOW_ENABLE_AIP_52: "false"
JOB_ID: >
postgres-in-progress-disabled-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
if: needs.build-info.outputs.run-tests == 'true' && needs.build-info.outputs.run-amazon-tests == 'true'
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ repos:
files: config\.yml$|default_airflow\.cfg$|default\.cfg$
pass_filenames: false
require_serial: true
additional_dependencies: ['pyyaml']
additional_dependencies: ['pyyaml', 'packaging']
- id: check-boring-cyborg-configuration
name: Checks for Boring Cyborg configuration consistency
language: python
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import ParamSpec

PS = ParamSpec("PS")
Expand Down Expand Up @@ -63,7 +64,9 @@ def get_internal_api_endpoint():

@staticmethod
def _init_values():
use_internal_api = conf.getboolean("core", "database_access_isolation")
use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False)
if use_internal_api and not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
internal_api_endpoint = ""
if use_internal_api:
internal_api_url = conf.get("core", "internal_api_url")
Expand Down
51 changes: 28 additions & 23 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.configuration import conf
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.timezone import parse as parsedate
Expand Down Expand Up @@ -2068,29 +2069,6 @@ class GroupCommand(NamedTuple):
ARG_DEBUG,
),
),
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
ActionCommand(
name="scheduler",
help="Start a scheduler instance",
Expand Down Expand Up @@ -2231,6 +2209,33 @@ class GroupCommand(NamedTuple):
),
]

if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
)


def _remove_dag_id_opt(command: ActionCommand):
cmd = command._asdict()
Expand Down
8 changes: 4 additions & 4 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,14 @@ core:
example: '{"some_param": "some_value"}'
database_access_isolation:
description: (experimental) Whether components should use Airflow Internal API for DB connectivity.
version_added: 2.6.0
version_added: 2.7.0
type: boolean
example: ~
default: "False"
internal_api_url:
description: |
(experimental)Airflow Internal API url. Only used if [core] database_access_isolation is True.
version_added: 2.6.0
(experimental) Airflow Internal API url. Only used if [core] database_access_isolation is True.
version_added: 2.7.0
type: string
default: ~
example: 'http://localhost:8080'
Expand Down Expand Up @@ -1674,7 +1674,7 @@ webserver:
run_internal_api:
description: |
Boolean for running Internal API in the webserver.
version_added: 2.6.0
version_added: 2.7.0
type: boolean
example: ~
default: "False"
Expand Down
10 changes: 0 additions & 10 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,6 @@ daemon_umask = 0o077
# Example: dataset_manager_kwargs = {{"some_param": "some_value"}}
# dataset_manager_kwargs =

# (experimental) Whether components should use Airflow Internal API for DB connectivity.
database_access_isolation = False

# (experimental)Airflow Internal API url. Only used if [core] database_access_isolation is True.
# Example: internal_api_url = http://localhost:8080
# internal_api_url =

[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
Expand Down Expand Up @@ -857,9 +850,6 @@ audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,g
# Boolean for running SwaggerUI in the webserver.
enable_swagger_ui = True

# Boolean for running Internal API in the webserver.
run_internal_api = False

# Boolean for enabling rate limiting on authentication endpoints.
auth_rate_limited = True

Expand Down
8 changes: 8 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,11 @@ def initialize():
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")


# AIP-52: setup/teardown (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_52 = os.environ.get("AIRFLOW_ENABLE_AIP_52", "false").lower() in {"true", "t", "yes", "y", "1"}
# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in ("true", "t", "yes", "y", "1")
3 changes: 3 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
from airflow.models import import_all_models
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.json import AirflowJsonProvider
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
Expand Down Expand Up @@ -161,6 +162,8 @@ def create_app(config=None, testing=False):
init_error_handlers(flask_app)
init_api_connexion(flask_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
init_api_internal(flask_app)
init_api_experimental(flask_app)

Expand Down
15 changes: 14 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import json
import os
import pathlib
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any

import yaml
from packaging.version import parse as parse_version

import airflow
from airflow.configuration import AirflowConfigParser, default_config_yaml
Expand Down Expand Up @@ -386,6 +388,13 @@ def _get_rst_filepath_from_path(filepath: pathlib.Path):
# -- Options for sphinx_jinja ------------------------------------------
# See: https://github.com/tardyp/sphinx-jinja

airflow_version = parse_version(
re.search( # type: ignore[union-attr,arg-type]
r"__version__ = \"([0-9\.]*)(\.dev[0-9]*)?\"",
(Path(__file__).parents[1] / "airflow" / "__init__.py").read_text(),
).groups(0)[0]
)

# Jinja context
if PACKAGE_NAME == "apache-airflow":
deprecated_options: dict[str, dict[str, tuple[str, str, str]]] = defaultdict(dict)
Expand All @@ -401,10 +410,14 @@ def _get_rst_filepath_from_path(filepath: pathlib.Path):
# e.g. {{dag_id}} in default_config.cfg -> {dag_id} in airflow.cfg, and what we want in docs
keys_to_format = ["default", "example"]
for conf_name, conf_section in configs.items():
for option_name, option in conf_section["options"].items():
for option_name, option in list(conf_section["options"].items()):
for key in keys_to_format:
if option[key] and "{{" in option[key]:
option[key] = option[key].replace("{{", "{").replace("}}", "}")
version_added = option["version_added"]
if version_added is not None and parse_version(version_added) > airflow_version:
del conf_section["options"][option_name]

# Sort options, config and deprecated options for JINJA variables to display
for section_name, config in configs.items():
config["options"] = {k: v for k, v in sorted(config["options"].items())}
Expand Down
14 changes: 14 additions & 0 deletions scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
from __future__ import annotations

import os
import re
from pathlib import Path

import yaml
from packaging.version import parse as parse_version

FILE_HEADER = """#
# Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -105,6 +108,10 @@ def _write_section(configfile, section_name, section):

def _write_option(configfile, idx, option_name, option):
option_description = None
version_added = option["version_added"]
if version_added is not None and parse_version(version_added) > airflow_version:
# skip if option is going to be added in the future version
return
if option["description"] is not None:
option_description = list(filter(lambda x: x is not None, option["description"].splitlines()))

Expand Down Expand Up @@ -145,6 +152,13 @@ def _write_option(configfile, idx, option_name, option):
airflow_default_config_path = os.path.join(airflow_config_dir, "default_airflow.cfg")
airflow_config_yaml_file_path = os.path.join(airflow_config_dir, "config.yml")

airflow_version = parse_version(
re.search( # type: ignore[union-attr,arg-type]
r"__version__ = \"([0-9\.]*)(\.dev[0-9]*)?\"",
(Path(__file__).parents[3] / "airflow" / "__init__.py").read_text(),
).groups(0)[0]
)

write_config(
yaml_config_file_path=airflow_config_yaml_file_path, default_cfg_file_path=airflow_default_config_path
)
Expand Down
2 changes: 2 additions & 0 deletions tests/api_internal/endpoints/test_rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.state import State
from airflow.www import app
from tests.test_utils.config import conf_vars
Expand All @@ -53,6 +54,7 @@ def factory() -> Flask:
return factory()


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestRpcApiEndpoint:
@pytest.fixture(autouse=True)
def setup_attrs(self, minimal_app_for_internal_api: Flask) -> Generator:
Expand Down
3 changes: 3 additions & 0 deletions tests/api_internal/test_internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.state import State
from tests.test_utils.config import conf_vars

Expand All @@ -38,6 +39,7 @@ def reset_init_api_config():
InternalApiConfig._initialized = False


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiConfig:
@conf_vars(
{
Expand Down Expand Up @@ -69,6 +71,7 @@ def test_force_database_direct_access(self):
assert InternalApiConfig.get_use_internal_api() is False


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiCall:
@staticmethod
@internal_api_call
Expand Down
2 changes: 2 additions & 0 deletions tests/cli/commands/test_internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.cli import cli_parser
from airflow.cli.commands import internal_api_command
from airflow.cli.commands.internal_api_command import GunicornMonitor
from airflow.settings import _ENABLE_AIP_44
from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass

console = Console(width=400, color_system="standard")
Expand Down Expand Up @@ -82,6 +83,7 @@ def test_ready_prefix_on_cmdline_dead_process(self):
assert self.monitor._get_num_ready_workers_running() == 0


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestCliInternalAPI(_ComonCLIGunicornTestClass):

main_process_regexp = r"airflow internal-api"
Expand Down