Skip to content

feat: add Apache Airflow integration with DAG/task evidence and tests#570

Merged
yashksaini-coder merged 19 commits intoTracer-Cloud:mainfrom
cerencamkiran:feature/airflow-integration
Apr 29, 2026
Merged

feat: add Apache Airflow integration with DAG/task evidence and tests#570
yashksaini-coder merged 19 commits intoTracer-Cloud:mainfrom
cerencamkiran:feature/airflow-integration

Conversation

@cerencamkiran
Copy link
Copy Markdown
Collaborator

@cerencamkiran cerencamkiran commented Apr 14, 2026

Fixes #330

Summary

Adds a robust Apache Airflow integration with end-to-end investigation support.

This PR wires Airflow evidence directly into the investigation flow. This includes:

  • detecting Airflow as a source during planning
  • resolving the integration via config/env
  • fetching DAG/task failure evidence via tools
  • surfacing that evidence in the RCA output

What's included

  • Airflow config, authentication, and connectivity validation
  • DAG runs and task instance retrieval helpers
  • Recent failure evidence extraction (resilient to partial API failures)
  • Integration into investigation planning and RCA reasoning
  • Unit tests for integration and node logic
  • E2E Airflow investigation tests

End-to-end validation

Validated against a live Airflow instance:

  • Created a deliberately failing DAG (test_fail_dag)
  • Triggered a failed run
  • Verified failure via Airflow UI and CLI
  • Ran E2E investigation tests locally against the live instance
airflow dags list-runs -d test_fail_dag
# state = failed
python -m pytest tests/e2e/airflow/test_orchestrator.py -v
# 4 passed, 1 skipped
# test_airflow_investigation_e2e PASSED

Screen recording

A short screen recording is attached showing:

  • the live Airflow instance
  • the failing DAG/run
  • the investigation flow
  • the successful end-to-end test execution

Notes

  • get_recent_airflow_failures now handles per-run API failures without dropping previously collected evidence
  • Airflow E2E tests are marked with @pytest.mark.e2e to align with CI conventions

Addressed all review feedback:

  • resolved merge conflicts
  • validated against a real failing DAG
  • ensured end-to-end investigation flow works with live data
  • added e2e test coverage
  • attached a short screen recording demonstrating the full flow

Happy to iterate further if anything is missing.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 14, 2026

Greptile Summary

This PR adds an initial Apache Airflow integration: a Pydantic config model, connectivity validation, DAG run/task instance retrieval, failure evidence extraction, and corresponding unit + E2E test scaffolding. The catalog.py changes are well-structured and correctly wire airflow_config_from_env() into load_env_integrations(), addressing the prior dead-code concern.

The remaining new findings are all P2: get_airflow_dags is defined but never called, _request_json doesn't guard against JSONDecodeError from non-JSON error pages, and the model_dump(exclude={"integration_id"}) call in load_env_integrations is a no-op since AirflowConfig has no such field. The node.py issues flagged in prior review threads (wrong if-block placement, missing integration_id in resolved output) are not addressed here and will cause tests/nodes/resolve_integrations/test_airflow.py to fail.

Confidence Score: 4/5

Safe to merge for catalog.py and airflow.py; node.py bugs flagged in prior threads remain unaddressed and will cause the resolve_integrations tests to fail.

All new findings in this PR are P2 (style/cleanup). The blocking concerns (wrong if-block in node.py, missing integration_id in node.py resolved output) were flagged in prior review threads and are not fixed here, meaning tests/nodes/resolve_integrations/test_airflow.py is still expected to fail. That prevents a score of 5.

tests/nodes/resolve_integrations/test_airflow.py — relies on node.py fixes that haven't landed yet

Important Files Changed

Filename Overview
app/integrations/airflow.py New Airflow integration helper: config model, connectivity validation, DAG run/task instance retrieval, and failure evidence extraction. Minor: get_airflow_dags is unused; _request_json does not catch JSONDecodeError.
app/integrations/catalog.py Adds Airflow to _classify_service_instance and load_env_integrations. The model_dump(exclude={"integration_id"}) exclude is a no-op since AirflowConfig has no such field.
tests/integrations/test_airflow.py Unit tests for Airflow integration helpers using monkeypatched httpx.request; mock URL ordering is correct (specific check before general).
tests/nodes/resolve_integrations/test_airflow.py Tests resolution and env-loading via node.py functions; catalog.py is now complete, but node.py bugs flagged in prior review threads (wrong if-block, missing integration_id) mean these tests are likely still failing.
tests/e2e/airflow/test_orchestrator.py E2E scaffold that skips when env vars are absent; correctly uses pytest.skip for missing credentials and DAG runs.
tests/e2e/airflow/fixtures/airflow_task_failure_alert.json Minimal alert fixture for the E2E investigation test; structure is correct.
tests/e2e/airflow/init.py Empty package init file.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant catalog as catalog.py
    participant airflow as airflow.py
    participant Airflow as Airflow API

    Caller->>catalog: load_env_integrations()
    catalog->>airflow: airflow_config_from_env()
    airflow-->>catalog: AirflowConfig | None
    catalog-->>Caller: [{id:"env-airflow", service:"airflow", credentials:{...}}]

    Caller->>catalog: _classify_service_instance("airflow", creds, record_id)
    catalog->>airflow: build_airflow_config(raw)
    airflow-->>catalog: AirflowConfig
    catalog-->>Caller: {**model_dump(), integration_id: record_id}

    Caller->>airflow: validate_airflow_config(config)
    airflow->>Airflow: GET /dags?limit=1
    Airflow-->>airflow: {dags:[...], total_entries: N}
    airflow-->>Caller: AirflowValidationResult(ok=True)

    Caller->>airflow: get_recent_airflow_failures(config, dag_id)
    airflow->>Airflow: GET /dags/{dag_id}/dagRuns
    Airflow-->>airflow: {dag_runs:[...]}
    loop for each dag_run
        airflow->>Airflow: GET /dags/{dag_id}/dagRuns/{run_id}/taskInstances
        Airflow-->>airflow: {task_instances:[...]}
    end
    airflow-->>Caller: [failure_evidence, ...]
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: app/integrations/catalog.py
Line: 1128

Comment:
**`exclude={"integration_id"}` is a no-op on `AirflowConfig`**

`AirflowConfig` has no `integration_id` field, so `model_dump(exclude={"integration_id"})` does nothing — it's equivalent to a plain `model_dump()`. The `exclude` argument silently ignores unknown field names in Pydantic v2, so this passes without error but gives a false sense that the field is being stripped. Consider removing the exclude argument or adding a comment explaining the intent.

```suggestion
                "credentials": airflow_config.model_dump(),
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 166-189

Comment:
**`get_airflow_dags` is unused**

`get_airflow_dags` is never called by any code in this PR or by the rest of the integration helpers — `validate_airflow_connection` issues its own `/dags?limit=1` request directly. If this function is intended for future use, a brief comment noting that would help; otherwise it is dead code alongside the `validate_airflow_connection` duplicate path.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 103-124

Comment:
**`response.json()` can raise `JSONDecodeError` on non-JSON error pages**

If Airflow returns an HTML error page (e.g. a 502 from an upstream proxy, or a 200 with an HTML body under misconfiguration), `response.json()` raises `json.JSONDecodeError`. `validate_airflow_config` catches this via the broad `except Exception`, but callers like `get_airflow_dag_runs` and `get_airflow_task_instances` propagate it uncaught, potentially crashing investigation workflows with a confusing error. Consider wrapping `response.json()` in a try/except or returning the response text as a fallback.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (9): Last reviewed commit: "fix(airflow): add catalog classification..." | Re-trigger Greptile

Comment thread app/nodes/resolve_integrations/node.py Outdated
Comment on lines +159 to +176
elif key == "airflow":
try:
airflow_config = build_airflow_config(
{
"base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL),
"username": credentials.get("username", ""),
"password": credentials.get("password", ""),
"auth_token": credentials.get("auth_token", ""),
"timeout_seconds": credentials.get("timeout_seconds", 15.0),
"verify_ssl": credentials.get("verify_ssl", True),
"max_results": credentials.get("max_results", 50),
}
)
except Exception:
continue

if airflow_config.is_configured:
resolved["airflow"] = airflow_config.model_dump()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 integration_id missing from resolved Airflow entry

Every other integration stored in resolved includes "integration_id" (see grafana, aws, datadog, github, sentry). resolved["airflow"] = airflow_config.model_dump() produces a dict with only the model fields (base_url, username, password, …), so resolved["airflow"].get("integration_id", "") will silently return "". Any downstream code that relies on this field (e.g. audit logs, de-duplication) will be broken for Airflow.

Suggested change
elif key == "airflow":
try:
airflow_config = build_airflow_config(
{
"base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL),
"username": credentials.get("username", ""),
"password": credentials.get("password", ""),
"auth_token": credentials.get("auth_token", ""),
"timeout_seconds": credentials.get("timeout_seconds", 15.0),
"verify_ssl": credentials.get("verify_ssl", True),
"max_results": credentials.get("max_results", 50),
}
)
except Exception:
continue
if airflow_config.is_configured:
resolved["airflow"] = airflow_config.model_dump()
elif key == "airflow":
try:
airflow_config = build_airflow_config(
{
"base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL),
"username": credentials.get("username", ""),
"password": credentials.get("password", ""),
"auth_token": credentials.get("auth_token", ""),
"timeout_seconds": credentials.get("timeout_seconds", 15.0),
"verify_ssl": credentials.get("verify_ssl", True),
"max_results": credentials.get("max_results", 50),
}
)
except Exception:
continue
if airflow_config.is_configured:
resolved["airflow"] = {
**airflow_config.model_dump(),
"integration_id": integration.get("id", ""),
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/nodes/resolve_integrations/node.py
Line: 159-176

Comment:
**`integration_id` missing from resolved Airflow entry**

Every other integration stored in `resolved` includes `"integration_id"` (see grafana, aws, datadog, github, sentry). `resolved["airflow"] = airflow_config.model_dump()` produces a dict with only the model fields (`base_url`, `username`, `password`, …), so `resolved["airflow"].get("integration_id", "")` will silently return `""`. Any downstream code that relies on this field (e.g. audit logs, de-duplication) will be broken for Airflow.

```suggestion
        elif key == "airflow":
            try:
                airflow_config = build_airflow_config(
                    {
                        "base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL),
                        "username": credentials.get("username", ""),
                        "password": credentials.get("password", ""),
                        "auth_token": credentials.get("auth_token", ""),
                        "timeout_seconds": credentials.get("timeout_seconds", 15.0),
                        "verify_ssl": credentials.get("verify_ssl", True),
                        "max_results": credentials.get("max_results", 50),
                    }
                )
            except Exception:
                continue

            if airflow_config.is_configured:
                resolved["airflow"] = {
                    **airflow_config.model_dump(),
                    "integration_id": integration.get("id", ""),
                }
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +299 to +305
"""Fetch recent failed or retrying task evidence for a DAG.

Strategy:
- fetch recent DAG runs
- keep failed DAG runs first, but also inspect recent runs generally
- fetch task instances for each run
- return failed/up_for_retry/upstream_failed task evidence
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Misleading docstring strategy comment

The docstring says "keep failed DAG runs first" but the implementation fetches the most recent runs regardless of state (ordered by -start_date). No pre-filtering by state=failed is applied before fetching task instances. Recent successful runs will consume the limit slots, potentially hiding older failures. Either update the comment to match the actual behaviour, or apply state="failed" to the get_airflow_dag_runs call if the "failed first" ordering is actually intended.

Prompt To Fix With AI
This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 299-305

Comment:
**Misleading docstring strategy comment**

The docstring says "keep failed DAG runs first" but the implementation fetches the most recent runs regardless of state (ordered by `-start_date`). No pre-filtering by `state=failed` is applied before fetching task instances. Recent successful runs will consume the `limit` slots, potentially hiding older failures. Either update the comment to match the actual behaviour, or apply `state="failed"` to the `get_airflow_dag_runs` call if the "failed first" ordering is actually intended.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +75 to +100
def airflow_config_from_env() -> AirflowConfig | None:
"""Load an Airflow config from env vars."""
username = os.getenv("AIRFLOW_USERNAME", "").strip()
auth_token = os.getenv("AIRFLOW_AUTH_TOKEN", "").strip()

if not username and not auth_token:
return None

return build_airflow_config(
{
"base_url": os.getenv("AIRFLOW_BASE_URL", DEFAULT_AIRFLOW_BASE_URL).strip()
or DEFAULT_AIRFLOW_BASE_URL,
"username": username,
"password": os.getenv("AIRFLOW_PASSWORD", "").strip(),
"auth_token": auth_token,
"timeout_seconds": os.getenv(
"AIRFLOW_TIMEOUT_SECONDS",
str(DEFAULT_AIRFLOW_TIMEOUT_SECONDS),
),
"verify_ssl": os.getenv("AIRFLOW_VERIFY_SSL", "true").strip().lower()
in ("true", "1", "yes"),
"max_results": os.getenv(
"AIRFLOW_MAX_RESULTS", str(DEFAULT_AIRFLOW_MAX_RESULTS)
).strip(),
}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 airflow_config_from_env is dead code

This function is defined but never called. _load_env_integrations in node.py (lines 304–327) re-implements the exact same env-loading logic inline rather than delegating here. The duplication means future changes to Airflow env-var handling need to be made in two places, violating the DRY principle. Consider either calling airflow_config_from_env() from _load_env_integrations, or removing the function.

Prompt To Fix With AI
This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 75-100

Comment:
**`airflow_config_from_env` is dead code**

This function is defined but never called. `_load_env_integrations` in `node.py` (lines 304–327) re-implements the exact same env-loading logic inline rather than delegating here. The duplication means future changes to Airflow env-var handling need to be made in two places, violating the DRY principle. Consider either calling `airflow_config_from_env()` from `_load_env_integrations`, or removing the function.

How can I resolve this? If you propose a fix, please make it concise.

@Devesh36
Copy link
Copy Markdown
Collaborator

Hey @cerencamkiran could you please resolve the merge conflicts and also fix the greptile issues . That would be great for us . Great Work btw 👍

Comment thread app/nodes/resolve_integrations/node.py Fixed
Comment thread tests/e2e/airflow/test_orchestrator.py Fixed
Comment thread app/nodes/resolve_integrations/node.py Outdated
Comment on lines +780 to +787
integrations.append(
{
"id": "env-airflow",
"service": "airflow",
"status": "active",
"credentials": airflow_config.model_dump(),
}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Airflow append inside wrong if block — NameError + silent drop

integrations.append({"id": "env-airflow", ...}) (lines 780–787) is nested inside if atlas_pub and atlas_priv and atlas_project: instead of if airflow_username or airflow_auth_token:. Two concrete failures result:

  1. Atlas configured, Airflow not configured: airflow_config is never assigned → NameError: name 'airflow_config' is not defined at line 785 every time Atlas env vars are present.
  2. Airflow configured, Atlas not configured: the append is never reached — Airflow integration is silently skipped.

The integrations.append block needs to move inside the Airflow if block, and the closing ) for build_airflow_config( at line 765 should be indented at 8 spaces (inside the if) rather than 4.

Prompt To Fix With AI
This is a comment left during a code review.
Path: app/nodes/resolve_integrations/node.py
Line: 780-787

Comment:
**Airflow append inside wrong `if` block — NameError + silent drop**

`integrations.append({"id": "env-airflow", ...})` (lines 780–787) is nested inside `if atlas_pub and atlas_priv and atlas_project:` instead of `if airflow_username or airflow_auth_token:`. Two concrete failures result:

1. **Atlas configured, Airflow not configured**: `airflow_config` is never assigned → `NameError: name 'airflow_config' is not defined` at line 785 every time Atlas env vars are present.
2. **Airflow configured, Atlas not configured**: the `append` is never reached — Airflow integration is silently skipped.

The `integrations.append` block needs to move inside the Airflow `if` block, and the closing `)` for `build_airflow_config(` at line 765 should be indented at 8 spaces (inside the `if`) rather than 4.

How can I resolve this? If you propose a fix, please make it concise.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Hi @Devesh36, thanks! I've resolved the conflicts.

@davincios
Copy link
Copy Markdown
Contributor

I reviewed the still-open Airflow comments in a separate worktree and validated the branch logic locally.

What I fixed locally (not pushed yet):

  • guarded AirflowConfig.auth so bearer-token auth is not silently overridden by basic auth when both are configured
  • switched _load_env_integrations() to reuse airflow_config_from_env() instead of duplicating the env parsing logic
  • updated the get_recent_airflow_failures() strategy docstring so it matches the current implementation
  • added targeted tests for auth precedence, shared env loading, env-based integration loading, and integration_id preservation

Verification I ran:

  • python -m pytest tests/integrations/test_airflow.py tests/nodes/resolve_integrations/test_airflow.py -> 10 passed
  • python -m ruff check app/integrations/airflow.py app/nodes/resolve_integrations/node.py tests/integrations/test_airflow.py tests/nodes/resolve_integrations/test_airflow.py -> passed

Live integration check:

  • no real Airflow credentials were available in the local shell env, so I started a disposable Airflow 2.9.1 instance and tested the helpers against the real REST API from the separate worktree
  • validate_airflow_config() succeeded against http://127.0.0.1:18080/api/v1
  • get_airflow_dags() returned 59 visible DAGs
  • I triggered example_bash_operator via the Airflow API, then verified get_airflow_dag_runs() could see the queued run and get_airflow_task_instances() returned 7 task instances for it
  • get_recent_airflow_failures() returned [] in that stock standalone instance because there were no failing example DAG runs to harvest evidence from

So the remaining open Airflow review comments look valid and addressable, and the integration paths for connectivity, DAG listing, DAG-run lookup, and task-instance lookup do work against a live Airflow API. The one thing I could not prove from the stock demo instance was failure-evidence extraction with an actually failing run.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Thanks @davincios super helpful validation. I’ll address the remaining Airflow review comments, resolve the node.py conflict, and push the fixes shortly.

- guard AirflowConfig.auth so a configured bearer token is not silently
  overridden by basic-auth credentials when both are populated
- have _load_env_integrations() reuse airflow_config_from_env() instead
  of duplicating the env-var parsing logic inline
- align get_recent_airflow_failures() docstring with the actual fetch
  strategy (no failed-first ordering is applied)
- add tests for auth precedence, shared env loading, env-based
  integration loading, and integration_id preservation through
  _classify_integrations()

Made-with: Cursor
@davincios
Copy link
Copy Markdown
Contributor

Thanks @davincios super helpful validation. I’ll address the remaining Airflow review comments, resolve the node.py conflict, and push the fixes shortly.

Ok ok thanks for circling back this quickly, just to note that I did push the fixes already so you can work from there!

@davincios
Copy link
Copy Markdown
Contributor

This is part of task:
#330

Do not merge before acceptance criteria are met:

Including that the PR includes a screen video showing setup plus a successful investigation flow using Apache Airflow
PR without working end-to-end coverage and screen-video proof should be rejected

Comment on lines +10 to +53
def test_classify_airflow_from_store_includes_integration_id() -> None:
integrations = [
{
"id": "store-airflow",
"service": "airflow",
"status": "active",
"credentials": {
"base_url": "https://airflow.example.com/api/v1",
"auth_token": "store-token",
"timeout_seconds": 20,
"verify_ssl": True,
"max_results": 25,
},
}
]

resolved = _classify_integrations(integrations)

assert resolved["airflow"]["base_url"] == "https://airflow.example.com/api/v1"
assert resolved["airflow"]["auth_token"] == "store-token"
assert resolved["airflow"]["integration_id"] == "store-airflow"


def test_load_env_airflow(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AIRFLOW_BASE_URL", "https://airflow.example.com/api/v1/")
monkeypatch.setenv("AIRFLOW_AUTH_TOKEN", "env-token")
monkeypatch.setenv("AIRFLOW_VERIFY_SSL", "false")
monkeypatch.setenv("AIRFLOW_TIMEOUT_SECONDS", "30")
monkeypatch.setenv("AIRFLOW_MAX_RESULTS", "10")

integrations = _load_env_integrations()
airflow = [integration for integration in integrations if integration["service"] == "airflow"]

assert len(airflow) == 1
creds = airflow[0]["credentials"]
assert airflow[0]["id"] == "env-airflow"
assert creds["base_url"] == "https://airflow.example.com/api/v1"
assert creds["auth_token"] == "env-token"
assert creds["verify_ssl"] is False
assert creds["timeout_seconds"] == 30.0
assert creds["max_results"] == 10


def test_load_env_airflow_absent_without_auth(monkeypatch: pytest.MonkeyPatch) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Tests assert against missing catalog.py support — both will fail

catalog.py's _classify_service_instance has no Airflow branch; the generic fallback (line 682) returns {"credentials": <nested dict>, "integration_id": record_id}. So resolved["airflow"]["base_url"] will raise KeyErrorbase_url is nested under "credentials", not at the top level.

test_load_env_airflow fails separately: load_env_integrations() in catalog.py reads no AIRFLOW_* env vars, so the airflow filter returns [] and len(airflow) == 1 fails immediately.

Both tests need the Airflow case to be added to _classify_service_instance and load_env_integrations in catalog.py before they can pass.

Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/nodes/resolve_integrations/test_airflow.py
Line: 10-53

Comment:
**Tests assert against missing catalog.py support — both will fail**

`catalog.py`'s `_classify_service_instance` has no Airflow branch; the generic fallback (line 682) returns `{"credentials": <nested dict>, "integration_id": record_id}`. So `resolved["airflow"]["base_url"]` will raise `KeyError``base_url` is nested under `"credentials"`, not at the top level.

`test_load_env_airflow` fails separately: `load_env_integrations()` in `catalog.py` reads no `AIRFLOW_*` env vars, so the airflow filter returns `[]` and `len(airflow) == 1` fails immediately.

Both tests need the Airflow case to be added to `_classify_service_instance` and `load_env_integrations` in `catalog.py` before they can pass.

How can I resolve this? If you propose a fix, please make it concise.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Ekran görüntüsü 2026-04-19 212405 Ekran görüntüsü 2026-04-19 213509 Ekran görüntüsü 2026-04-19 213602

Copy link
Copy Markdown
Collaborator

@yashksaini-coder yashksaini-coder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good foundation, but a few things need to be sorted before this can merge.

Merge state

  • Branch has a live conflict in app/integrations/catalog.py against current main. Needs a rebase.

Issue #330 acceptance criteria

  • The issue explicitly asks for a screen video of setup + a successful investigation flow. Three screenshots don't satisfy that, please record a short video.
  • PR description says Airflow evidence isn't wired into investigation planning / RCA yet ("further work will integrate..."). That's the exact end-to-end path #330 requires, so either land that wiring here or scope this PR down and clearly split the remainder into a tracked follow-up.

Outstanding review feedback not yet on the branch

  • @davincios pushed fixes for auth precedence, shared env loading, and extra tests on a separate worktree. Those haven't landed on this PR branch yet. Please pull them in.

Code issues worth fixing while you're in here

  • catalog.py exclude={"integration_id"} on the airflow env block is a no-op, AirflowConfig has no such field. Either add the field to the model or drop the exclude so intent is clear.
  • get_recent_airflow_failures has no try/except around the per-run get_airflow_task_instances call. A single 4xx/5xx mid-loop throws away every piece of evidence already collected. Wrap it and continue, same discipline as validate_airflow_config.
  • E2E tests in tests/e2e/airflow/test_orchestrator.py don't have @pytest.mark.e2e. Other integrations use the marker, please match the convention so CI can gate them.

One gap in the live verification

  • Thanks to @davincios for running against a real Airflow, but get_recent_airflow_failures" returned []` on the demo instance because there were no failing runs. That's the single most important codepath for the RCA use case. Please reproduce with an actually-failing DAG and include that in the screen video.

Once the conflict is resolved, the video is attached, and the items above are addressed, happy to re-review. Nice work getting it this far.

Copy link
Copy Markdown
Member

@VaibhavUpreti VaibhavUpreti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cerencamkiran, could you please update the docs with the instructions as well?
@yashksaini-coder, maybe we can create a checklist for adding a new integration, store it somewhere, and link it in the pull request template?

@yashksaini-coder yashksaini-coder force-pushed the feature/airflow-integration branch from 969165a to a71aa6c Compare April 20, 2026 16:23
@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Thanks for the detailed feedback. I really appreciate it.
I’m currently reviewing another PR, but I’ll get back to this one shortly and address all the points.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

catalog.py.-.opensre.-.Visual.Studio.Code.2026-04-21.15-10-14.mp4

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

build_prompt.py.-.opensre.-.Visual.Studio.Code.2026-04-21.16-10-51.1.mp4

@muddlebee
Copy link
Copy Markdown
Collaborator

and pls add docs or take it up as separate PR #1020

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Greptile shows stale comments even after fixes I’ve addressed the issues. I’m also preparing the docs and will include them shortly @muddlebee

@muddlebee
Copy link
Copy Markdown
Collaborator

@cerencamkiran ready to merge then?

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Added Airflow integration docs @muddlebee! Happy to iterate further if needed.

seeded.append("list_openclaw_tools")

if "airflow" in available_sources:
seeded.append("get_recent_airflow_failures")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three names are not registered as @tool callables anywhere in app/tools/. The existing TracerAirflowMetricsTool only registers get_airflow_metrics. Seeding names the registry doesn't know about means the LLM planner will be directed toward them (especially via the priority hint in build_prompt.py) but execution will silently skip or error out on them. The helpers in app/integrations/airflow.py need corresponding @tool-decorated wrappers under app/tools/ before these seeds are useful.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cerencamkiran I think this is still missing?

class AirflowConfig(BaseModel):
"""Normalized Airflow connection settings."""

base_url: str = DEFAULT_AIRFLOW_BASE_URL
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirflowConfig should extend StrictConfigModel (not BaseModel) to match the project convention — see BetterStackConfig, for example. StrictConfigModel already provides the string-stripping behaviour you've manually re-implemented in _normalize_str, so that validator can be removed.

Comment thread app/integrations/airflow.py Outdated
return str(value or "").strip()

@property
def api_base_url(self) -> str:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api_base_url always equals base_url: _normalize_base_url already strips the trailing slash, so this property is a no-op. Use config.base_url directly in _request_json and remove the property.

Comment thread app/integrations/airflow.py Outdated
return payload if isinstance(payload, dict) else {}


def get_airflow_dags(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_airflow_dags is never imported or called anywhere in this PR. If it's not going to be used via a tool wrapper, remove it. Dead exported functions make the module's surface area misleading.

Comment thread app/integrations/airflow.py Outdated
"pool": task_instance.get("pool", ""),
"queue": task_instance.get("queue", ""),
"priority_weight": task_instance.get("priority_weight"),
"note": "; ".join(note_parts),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The note field concatenates task_state, try_number, max_tries, and dag_run_state — all of which are already present as top-level fields in the same dict. This is redundant and adds noise to the LLM context. Drop it, or keep it only if there's a downstream consumer that specifically reads the note string.

Comment thread app/integrations/airflow.py Outdated
dag_id=dag_id,
dag_run_id=dag_run_id,
)
except Exception:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent swallow hides real problems. Auth failures, connection errors, or malformed responses on any individual run will be swallowed here with no signal. At minimum: log a warning with the dag_run_id and the exception type before continuing. Otherwise a misconfigured integration looks identical to one with zero task failures.

Comment thread app/integrations/catalog.py Outdated
**airflow_config.model_dump(),
"integration_id": record_id,
}, "airflow"
return None, None # ← eksikti
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turkish inline comment ('eksikti' = 'was missing'). Remove before merge.

Comment thread app/integrations/catalog.py Outdated
"integration_id": record_id,
}, "betterstack"
return None, None
return None, None # ← eksikti
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same stray Turkish comment as line 599. Remove.

Comment thread tests/e2e/airflow/test_orchestrator.py Outdated

for alert in raw_alert.get("alerts", []):
annotations = alert.setdefault("annotations", {})
annotations["airflow_base_url"] = base_url
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Airflow integration is resolved from env vars / the integration store, not from alert annotations. Injecting airflow_base_url and airflow_dag_id here has no effect on how the agent picks up the integration — it comes from AIRFLOW_BASE_URL etc. already set in the env. This annotation injection is either a no-op or reflects a planned but not yet implemented annotation-based resolution path. Either way it should be clarified or removed to avoid confusion.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Thanks a lot for the detailed review @muddlebee.

I’ve addressed all the other points (tool seeding, cleanup, logging, tests, etc.).
I didn’t switch AirflowConfig to StrictConfigModel yet, as it was breaking validation in this branch. I wanted to keep the behavior stable for this PR.

@muddlebee
Copy link
Copy Markdown
Collaborator

@cerencamkiran

image

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Good catch @muddlebee.

  • Added proper @tool wrappers for the Airflow helpers so they are registered in the tool registry
  • Updated the planner seeding to use only valid, callable tools (task instances removed from initial seed since it requires dag_run_id)

Airflow tools are now correctly selected and invoked during the investigation flow.

Note:
In local testing, the Airflow API returns 403 due to permission constraints, but the end-to-end flow (alert → planner → Airflow tool invocation) is working as expected.

Ekran görüntüsü 2026-04-29 144554

Copy link
Copy Markdown
Collaborator

@muddlebee muddlebee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM nice work @cerencamkiran

Copy link
Copy Markdown
Collaborator

@yashksaini-coder yashksaini-coder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thorough local review — LGTM ✅

Checked out cerencamkiran/opensre:feature/airflow-integration, ran the full quality gate locally.

CI / Quality gate

Check Result
make lint (ruff) ✅ All checks passed
make typecheck (mypy) ✅ No new errors (2 pre-existing errors in clickhouse.py/kafka.py exist on main too)
Unit + integration tests (-x -q --ignore=e2e) 2962 passed, 1 skipped
Airflow-specific tests (13 tests) ✅ All pass
GitHub CI checks test, quality, typecheck, CodeQL all green

Code review

app/integrations/airflow.py

  • Clean separation: config model, env loader, _request_json transport, and domain helpers are all well-scoped
  • auth property correctly gates basic auth behind not self.auth_token — bearer token always wins ✅
  • get_recent_airflow_failures wraps per-run get_airflow_task_instances in try/except with continue — previously raised review concern addressed ✅
  • _to_failure_evidence is thorough and investigation-friendly

app/tools/TracerAirflowDAGTool/__init__.py

  • Proper @tool wrappers for all three functions — tools are registered in the registry correctly
  • extract_params builds config + dag_id from sources — get_airflow_task_instances excluded from auto-seed (requires dag_run_id) — correct decision
  • is_available checks "airflow" in sources — clean gating

app/integrations/catalog.py

  • classify_integrations correctly populates the Airflow block
  • airflow_config_from_env() reused in load_env_integrations() — no duplication ✅
  • "airflow" present in direct_services tuple — integration wired end-to-end

app/nodes/plan_actions/

  • detect_sources.py correctly detects Airflow from alert context
  • build_prompt.py adds Airflow-specific context to the investigation prompt
  • plan_actions.py seeds get_recent_airflow_failures and get_airflow_dag_runs when Airflow is an available source

Tests

  • 13 unit tests across integration helpers, routing, and catalog resolution — all well-structured, use monkeypatch on httpx.request cleanly
  • E2E tests marked @pytest.mark.e2e — CI-safe ✅
  • Evidence from real failing DAG (test_fail_dag) included via screen recording ✅

All previously raised review concerns (merge conflicts, missing try/except in failure loop, auth precedence, airflow_config_from_env deduplication, e2e marker, screen video) have been addressed.

Approving. Ready to merge. 🚀

@yashksaini-coder yashksaini-coder merged commit 3474371 into Tracer-Cloud:main Apr 29, 2026
8 checks passed
@github-actions
Copy link
Copy Markdown
Contributor

💜 One more reason the project grows. Thanks @cerencamkiran — your contribution just landed!


👋 Join us on Discord - OpenSRE : hang out, contribute, or hunt for features and issues. Everyone's welcome.

@hamzzaaamalik
Copy link
Copy Markdown
Collaborator

Looked at this carefully. Found one issue worth fixing before merge:

P1: tools have source="tracer_web" instead of source="airflow". That's the SaaS webapp source. The prioritizer at prioritization.py:58 gives a +2 score bump when action.source in sources — for an Airflow incident, sources contains "airflow", so the bump never fires for these tools. The seeding mechanism keeps them in the pool, but they score 0 when they should score 2. Looks like the workaround was because EvidenceSource (in app/types/evidence.py) is a closed Literal and doesn't list "airflow". Easy fix: add "airflow" to the Literal, then update the three @tool(source=...) calls.

P2: AirflowConfig extends BaseModel, not StrictConfigModel — @yashksaini-coder asked about this earlier; doesn't look like it got addressed. Most other integrations use StrictConfigModel to forbid extra fields.

P2: no test for the exception-recovery path. airflow.py:282-294 carefully handles a single failing DAG-run-task-instances call so prior evidence isn't lost — but there's no test that exercises that path. Worth a "one fails, one succeeds, evidence preserved" test.

P3, non-blocking: packaging/sync_release_version.py has an unrelated ruff-format reflow swept in.

Nit: other tools set surfaces=("investigation", "chat"); these don't — intentional?

Otherwise: catalog/env loading/resolve_effective_integrations follow the existing pattern, auth precedence is correct with a regression test, DAG IDs are URL-encoded, lint/format/mypy clean, 13 new tests + no regressions in the broad sweep (1916 passed locally).

LGTM after the source fix.

@hamzzaaamalik
Copy link
Copy Markdown
Collaborator

Bug: Wrong source on the tools. All three Airflow tools are tagged source="tracer_web" (which is the SaaS webapp), not source="airflow". This matters because the planner gives tools a +2 score boost when their source matches the incident — so on an Airflow incident, these tools get 0 boost when they should get 2. The seeding logic still gets them into the pool, but they could lose out to other tools when the budget is tight.

Looks like the author hit this because EvidenceSource in app/types/evidence.py is a closed list and doesn't include "airflow". The fix is small:

Add "airflow" to the EvidenceSource list
Change source="tracer_web" to source="airflow" in the three @tool calls
Other things worth fixing:

AirflowConfig uses BaseModel instead of StrictConfigModel — @yashksaini-coder asked about this earlier and it doesn't look addressed. Most other integrations use the strict version.
No test for the failure-recovery path. The code carefully handles the case where one DAG run's API call fails so other evidence isn't lost — but there's no test that actually checks this.
Small things, not blocking:

packaging/sync_release_version.py has an unrelated formatting tweak swept in.
Other tools set surfaces=("investigation", "chat"), these don't — intentional?

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Thanks for the detailed review @hamzzaaamalik.
I used tracer_web as a temporary workaround because EvidenceSource is currently a closed Literal and did not include airflow, so source="airflow" failed validation.

On StrictConfigModel: I tried switching AirflowConfig to it earlier, but it broke validation in this branch, so I kept BaseModel to avoid changing runtime behaviour in the same PR.

@hamzzaaamalik
Copy link
Copy Markdown
Collaborator

@cerencamkiran : yes but on the source, I'd still push for the fix here, it's small (one line in EvidenceSource + three source= swaps). The tracer_web workaround leaves the planner's audit trail wrong on Airflow incidents.

StrictConfigModel: fine to defer. Mind opening a follow-up issue so it doesn't get lost?

Other stuff (recovery test, surfaces=, the unrelated reflow) is non-blocking.

@cerencamkiran
Copy link
Copy Markdown
Collaborator Author

Got it @hamzzaaamalik makes sense. I’ll add "airflow" to EvidenceSource and update the tool sources.

@hamzzaaamalik
Copy link
Copy Markdown
Collaborator

Thanks!

yashksaini-coder added a commit that referenced this pull request Apr 29, 2026
…tion-recovery test (#1077)

* fix(airflow): correct tool source, use StrictConfigModel, add surfaces, test recovery path

- Fix P1: change source='tracer_web' → source='airflow' on all 3 Airflow
  @tool decorators so the prioritizer's +2 score bump fires correctly when
  'airflow' appears in detected sources (prioritization.py:58)
- Add 'airflow' to EvidenceSource Literal in app/types/evidence.py
- Fix P2: switch AirflowConfig from BaseModel → StrictConfigModel
  (extra='forbid') to match the rest of the integration config models
- Add surfaces=('investigation', 'chat') to all 3 tools, matching the
  convention used by other integrations (PostgreSQL, MySQL, GitHub, etc.)
- Add P2 test: test_get_recent_airflow_failures_partial_run_error_preserves_evidence
  covers the exception-recovery loop in get_recent_airflow_failures —
  verifies evidence from a successful run is preserved when another run's
  task-instance fetch raises HTTPStatusError

Follows up on #570.

Co-authored-by: Copilot <[email protected]>

* fix(tests): correct search_logs assertions in test_datadog_client

search_logs() returns {success, logs, total} directly — not a nested
{logs: {success, logs}, monitors, events} shape like fetch_all() does.

Three tests were written against the wrong shape:
- test_search_logs_success: removed monitors/events assertions, fixed
  result['logs'][0]['message'] (was result['logs']['logs'][0]['message'])
- test_search_logs_empty_data: fixed result['logs'] == [] (was
  result['logs']['logs']), removed incorrect monitors/events checks
- test_search_logs_http_error: fixed result['error'] (was
  result['logs']['error']), removed unnecessary mock_instance.get stub

All 19 tests in test_datadog_client.py now pass.

Co-authored-by: Copilot <[email protected]>

---------

Co-authored-by: Copilot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Add Apache Airflow integration for DAG and task-run RCA

9 participants