feat: add Apache Airflow integration with DAG/task evidence and tests#570
Conversation
Greptile SummaryThis PR adds an initial Apache Airflow integration: a Pydantic config model, connectivity validation, DAG run/task instance retrieval, failure evidence extraction, and corresponding unit + E2E test scaffolding. The The remaining new findings are all P2: Confidence Score: 4/5Safe to merge for catalog.py and airflow.py; node.py bugs flagged in prior threads remain unaddressed and will cause the resolve_integrations tests to fail. All new findings in this PR are P2 (style/cleanup). The blocking concerns (wrong if-block in node.py, missing integration_id in node.py resolved output) were flagged in prior review threads and are not fixed here, meaning tests/nodes/resolve_integrations/test_airflow.py is still expected to fail. That prevents a score of 5. tests/nodes/resolve_integrations/test_airflow.py — relies on node.py fixes that haven't landed yet Important Files Changed
|
| elif key == "airflow": | ||
| try: | ||
| airflow_config = build_airflow_config( | ||
| { | ||
| "base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL), | ||
| "username": credentials.get("username", ""), | ||
| "password": credentials.get("password", ""), | ||
| "auth_token": credentials.get("auth_token", ""), | ||
| "timeout_seconds": credentials.get("timeout_seconds", 15.0), | ||
| "verify_ssl": credentials.get("verify_ssl", True), | ||
| "max_results": credentials.get("max_results", 50), | ||
| } | ||
| ) | ||
| except Exception: | ||
| continue | ||
|
|
||
| if airflow_config.is_configured: | ||
| resolved["airflow"] = airflow_config.model_dump() |
There was a problem hiding this comment.
integration_id missing from resolved Airflow entry
Every other integration stored in resolved includes "integration_id" (see grafana, aws, datadog, github, sentry). resolved["airflow"] = airflow_config.model_dump() produces a dict with only the model fields (base_url, username, password, …), so resolved["airflow"].get("integration_id", "") will silently return "". Any downstream code that relies on this field (e.g. audit logs, de-duplication) will be broken for Airflow.
| elif key == "airflow": | |
| try: | |
| airflow_config = build_airflow_config( | |
| { | |
| "base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL), | |
| "username": credentials.get("username", ""), | |
| "password": credentials.get("password", ""), | |
| "auth_token": credentials.get("auth_token", ""), | |
| "timeout_seconds": credentials.get("timeout_seconds", 15.0), | |
| "verify_ssl": credentials.get("verify_ssl", True), | |
| "max_results": credentials.get("max_results", 50), | |
| } | |
| ) | |
| except Exception: | |
| continue | |
| if airflow_config.is_configured: | |
| resolved["airflow"] = airflow_config.model_dump() | |
| elif key == "airflow": | |
| try: | |
| airflow_config = build_airflow_config( | |
| { | |
| "base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL), | |
| "username": credentials.get("username", ""), | |
| "password": credentials.get("password", ""), | |
| "auth_token": credentials.get("auth_token", ""), | |
| "timeout_seconds": credentials.get("timeout_seconds", 15.0), | |
| "verify_ssl": credentials.get("verify_ssl", True), | |
| "max_results": credentials.get("max_results", 50), | |
| } | |
| ) | |
| except Exception: | |
| continue | |
| if airflow_config.is_configured: | |
| resolved["airflow"] = { | |
| **airflow_config.model_dump(), | |
| "integration_id": integration.get("id", ""), | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/nodes/resolve_integrations/node.py
Line: 159-176
Comment:
**`integration_id` missing from resolved Airflow entry**
Every other integration stored in `resolved` includes `"integration_id"` (see grafana, aws, datadog, github, sentry). `resolved["airflow"] = airflow_config.model_dump()` produces a dict with only the model fields (`base_url`, `username`, `password`, …), so `resolved["airflow"].get("integration_id", "")` will silently return `""`. Any downstream code that relies on this field (e.g. audit logs, de-duplication) will be broken for Airflow.
```suggestion
elif key == "airflow":
try:
airflow_config = build_airflow_config(
{
"base_url": credentials.get("base_url", DEFAULT_AIRFLOW_BASE_URL),
"username": credentials.get("username", ""),
"password": credentials.get("password", ""),
"auth_token": credentials.get("auth_token", ""),
"timeout_seconds": credentials.get("timeout_seconds", 15.0),
"verify_ssl": credentials.get("verify_ssl", True),
"max_results": credentials.get("max_results", 50),
}
)
except Exception:
continue
if airflow_config.is_configured:
resolved["airflow"] = {
**airflow_config.model_dump(),
"integration_id": integration.get("id", ""),
}
```
How can I resolve this? If you propose a fix, please make it concise.| """Fetch recent failed or retrying task evidence for a DAG. | ||
|
|
||
| Strategy: | ||
| - fetch recent DAG runs | ||
| - keep failed DAG runs first, but also inspect recent runs generally | ||
| - fetch task instances for each run | ||
| - return failed/up_for_retry/upstream_failed task evidence |
There was a problem hiding this comment.
Misleading docstring strategy comment
The docstring says "keep failed DAG runs first" but the implementation fetches the most recent runs regardless of state (ordered by -start_date). No pre-filtering by state=failed is applied before fetching task instances. Recent successful runs will consume the limit slots, potentially hiding older failures. Either update the comment to match the actual behaviour, or apply state="failed" to the get_airflow_dag_runs call if the "failed first" ordering is actually intended.
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 299-305
Comment:
**Misleading docstring strategy comment**
The docstring says "keep failed DAG runs first" but the implementation fetches the most recent runs regardless of state (ordered by `-start_date`). No pre-filtering by `state=failed` is applied before fetching task instances. Recent successful runs will consume the `limit` slots, potentially hiding older failures. Either update the comment to match the actual behaviour, or apply `state="failed"` to the `get_airflow_dag_runs` call if the "failed first" ordering is actually intended.
How can I resolve this? If you propose a fix, please make it concise.| def airflow_config_from_env() -> AirflowConfig | None: | ||
| """Load an Airflow config from env vars.""" | ||
| username = os.getenv("AIRFLOW_USERNAME", "").strip() | ||
| auth_token = os.getenv("AIRFLOW_AUTH_TOKEN", "").strip() | ||
|
|
||
| if not username and not auth_token: | ||
| return None | ||
|
|
||
| return build_airflow_config( | ||
| { | ||
| "base_url": os.getenv("AIRFLOW_BASE_URL", DEFAULT_AIRFLOW_BASE_URL).strip() | ||
| or DEFAULT_AIRFLOW_BASE_URL, | ||
| "username": username, | ||
| "password": os.getenv("AIRFLOW_PASSWORD", "").strip(), | ||
| "auth_token": auth_token, | ||
| "timeout_seconds": os.getenv( | ||
| "AIRFLOW_TIMEOUT_SECONDS", | ||
| str(DEFAULT_AIRFLOW_TIMEOUT_SECONDS), | ||
| ), | ||
| "verify_ssl": os.getenv("AIRFLOW_VERIFY_SSL", "true").strip().lower() | ||
| in ("true", "1", "yes"), | ||
| "max_results": os.getenv( | ||
| "AIRFLOW_MAX_RESULTS", str(DEFAULT_AIRFLOW_MAX_RESULTS) | ||
| ).strip(), | ||
| } | ||
| ) |
There was a problem hiding this comment.
airflow_config_from_env is dead code
This function is defined but never called. _load_env_integrations in node.py (lines 304–327) re-implements the exact same env-loading logic inline rather than delegating here. The duplication means future changes to Airflow env-var handling need to be made in two places, violating the DRY principle. Consider either calling airflow_config_from_env() from _load_env_integrations, or removing the function.
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/integrations/airflow.py
Line: 75-100
Comment:
**`airflow_config_from_env` is dead code**
This function is defined but never called. `_load_env_integrations` in `node.py` (lines 304–327) re-implements the exact same env-loading logic inline rather than delegating here. The duplication means future changes to Airflow env-var handling need to be made in two places, violating the DRY principle. Consider either calling `airflow_config_from_env()` from `_load_env_integrations`, or removing the function.
How can I resolve this? If you propose a fix, please make it concise.|
Hey @cerencamkiran could you please resolve the merge conflicts and also fix the greptile issues . That would be great for us . Great Work btw 👍 |
| integrations.append( | ||
| { | ||
| "id": "env-airflow", | ||
| "service": "airflow", | ||
| "status": "active", | ||
| "credentials": airflow_config.model_dump(), | ||
| } | ||
| ) |
There was a problem hiding this comment.
Airflow append inside wrong
if block — NameError + silent drop
integrations.append({"id": "env-airflow", ...}) (lines 780–787) is nested inside if atlas_pub and atlas_priv and atlas_project: instead of if airflow_username or airflow_auth_token:. Two concrete failures result:
- Atlas configured, Airflow not configured:
airflow_configis never assigned →NameError: name 'airflow_config' is not definedat line 785 every time Atlas env vars are present. - Airflow configured, Atlas not configured: the
appendis never reached — Airflow integration is silently skipped.
The integrations.append block needs to move inside the Airflow if block, and the closing ) for build_airflow_config( at line 765 should be indented at 8 spaces (inside the if) rather than 4.
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/nodes/resolve_integrations/node.py
Line: 780-787
Comment:
**Airflow append inside wrong `if` block — NameError + silent drop**
`integrations.append({"id": "env-airflow", ...})` (lines 780–787) is nested inside `if atlas_pub and atlas_priv and atlas_project:` instead of `if airflow_username or airflow_auth_token:`. Two concrete failures result:
1. **Atlas configured, Airflow not configured**: `airflow_config` is never assigned → `NameError: name 'airflow_config' is not defined` at line 785 every time Atlas env vars are present.
2. **Airflow configured, Atlas not configured**: the `append` is never reached — Airflow integration is silently skipped.
The `integrations.append` block needs to move inside the Airflow `if` block, and the closing `)` for `build_airflow_config(` at line 765 should be indented at 8 spaces (inside the `if`) rather than 4.
How can I resolve this? If you propose a fix, please make it concise.|
Hi @Devesh36, thanks! I've resolved the conflicts. |
|
I reviewed the still-open Airflow comments in a separate worktree and validated the branch logic locally. What I fixed locally (not pushed yet):
Verification I ran:
Live integration check:
So the remaining open Airflow review comments look valid and addressable, and the integration paths for connectivity, DAG listing, DAG-run lookup, and task-instance lookup do work against a live Airflow API. The one thing I could not prove from the stock demo instance was failure-evidence extraction with an actually failing run. |
|
Thanks @davincios super helpful validation. I’ll address the remaining Airflow review comments, resolve the node.py conflict, and push the fixes shortly. |
- guard AirflowConfig.auth so a configured bearer token is not silently overridden by basic-auth credentials when both are populated - have _load_env_integrations() reuse airflow_config_from_env() instead of duplicating the env-var parsing logic inline - align get_recent_airflow_failures() docstring with the actual fetch strategy (no failed-first ordering is applied) - add tests for auth precedence, shared env loading, env-based integration loading, and integration_id preservation through _classify_integrations() Made-with: Cursor
Ok ok thanks for circling back this quickly, just to note that I did push the fixes already so you can work from there! |
|
This is part of task: Do not merge before acceptance criteria are met: Including that the PR includes a screen video showing setup plus a successful investigation flow using Apache Airflow |
| def test_classify_airflow_from_store_includes_integration_id() -> None: | ||
| integrations = [ | ||
| { | ||
| "id": "store-airflow", | ||
| "service": "airflow", | ||
| "status": "active", | ||
| "credentials": { | ||
| "base_url": "https://airflow.example.com/api/v1", | ||
| "auth_token": "store-token", | ||
| "timeout_seconds": 20, | ||
| "verify_ssl": True, | ||
| "max_results": 25, | ||
| }, | ||
| } | ||
| ] | ||
|
|
||
| resolved = _classify_integrations(integrations) | ||
|
|
||
| assert resolved["airflow"]["base_url"] == "https://airflow.example.com/api/v1" | ||
| assert resolved["airflow"]["auth_token"] == "store-token" | ||
| assert resolved["airflow"]["integration_id"] == "store-airflow" | ||
|
|
||
|
|
||
| def test_load_env_airflow(monkeypatch: pytest.MonkeyPatch) -> None: | ||
| monkeypatch.setenv("AIRFLOW_BASE_URL", "https://airflow.example.com/api/v1/") | ||
| monkeypatch.setenv("AIRFLOW_AUTH_TOKEN", "env-token") | ||
| monkeypatch.setenv("AIRFLOW_VERIFY_SSL", "false") | ||
| monkeypatch.setenv("AIRFLOW_TIMEOUT_SECONDS", "30") | ||
| monkeypatch.setenv("AIRFLOW_MAX_RESULTS", "10") | ||
|
|
||
| integrations = _load_env_integrations() | ||
| airflow = [integration for integration in integrations if integration["service"] == "airflow"] | ||
|
|
||
| assert len(airflow) == 1 | ||
| creds = airflow[0]["credentials"] | ||
| assert airflow[0]["id"] == "env-airflow" | ||
| assert creds["base_url"] == "https://airflow.example.com/api/v1" | ||
| assert creds["auth_token"] == "env-token" | ||
| assert creds["verify_ssl"] is False | ||
| assert creds["timeout_seconds"] == 30.0 | ||
| assert creds["max_results"] == 10 | ||
|
|
||
|
|
||
| def test_load_env_airflow_absent_without_auth(monkeypatch: pytest.MonkeyPatch) -> None: |
There was a problem hiding this comment.
Tests assert against missing catalog.py support — both will fail
catalog.py's _classify_service_instance has no Airflow branch; the generic fallback (line 682) returns {"credentials": <nested dict>, "integration_id": record_id}. So resolved["airflow"]["base_url"] will raise KeyError — base_url is nested under "credentials", not at the top level.
test_load_env_airflow fails separately: load_env_integrations() in catalog.py reads no AIRFLOW_* env vars, so the airflow filter returns [] and len(airflow) == 1 fails immediately.
Both tests need the Airflow case to be added to _classify_service_instance and load_env_integrations in catalog.py before they can pass.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/nodes/resolve_integrations/test_airflow.py
Line: 10-53
Comment:
**Tests assert against missing catalog.py support — both will fail**
`catalog.py`'s `_classify_service_instance` has no Airflow branch; the generic fallback (line 682) returns `{"credentials": <nested dict>, "integration_id": record_id}`. So `resolved["airflow"]["base_url"]` will raise `KeyError` — `base_url` is nested under `"credentials"`, not at the top level.
`test_load_env_airflow` fails separately: `load_env_integrations()` in `catalog.py` reads no `AIRFLOW_*` env vars, so the airflow filter returns `[]` and `len(airflow) == 1` fails immediately.
Both tests need the Airflow case to be added to `_classify_service_instance` and `load_env_integrations` in `catalog.py` before they can pass.
How can I resolve this? If you propose a fix, please make it concise.
yashksaini-coder
left a comment
There was a problem hiding this comment.
Good foundation, but a few things need to be sorted before this can merge.
Merge state
- Branch has a live conflict in
app/integrations/catalog.pyagainst current main. Needs a rebase.
Issue #330 acceptance criteria
- The issue explicitly asks for a screen video of setup + a successful investigation flow. Three screenshots don't satisfy that, please record a short video.
- PR description says Airflow evidence isn't wired into investigation planning / RCA yet ("further work will integrate..."). That's the exact end-to-end path #330 requires, so either land that wiring here or scope this PR down and clearly split the remainder into a tracked follow-up.
Outstanding review feedback not yet on the branch
- @davincios pushed fixes for auth precedence, shared env loading, and extra tests on a separate worktree. Those haven't landed on this PR branch yet. Please pull them in.
Code issues worth fixing while you're in here
catalog.pyexclude={"integration_id"}on the airflow env block is a no-op,AirflowConfighas no such field. Either add the field to the model or drop the exclude so intent is clear.get_recent_airflow_failureshas no try/except around the per-runget_airflow_task_instancescall. A single 4xx/5xx mid-loop throws away every piece of evidence already collected. Wrap it andcontinue, same discipline asvalidate_airflow_config.- E2E tests in
tests/e2e/airflow/test_orchestrator.pydon't have@pytest.mark.e2e. Other integrations use the marker, please match the convention so CI can gate them.
One gap in the live verification
- Thanks to @davincios for running against a real Airflow, but
get_recent_airflow_failures" returned[]` on the demo instance because there were no failing runs. That's the single most important codepath for the RCA use case. Please reproduce with an actually-failing DAG and include that in the screen video.
Once the conflict is resolved, the video is attached, and the items above are addressed, happy to re-review. Nice work getting it this far.
VaibhavUpreti
left a comment
There was a problem hiding this comment.
@cerencamkiran, could you please update the docs with the instructions as well?
@yashksaini-coder, maybe we can create a checklist for adding a new integration, store it somewhere, and link it in the pull request template?
969165a to
a71aa6c
Compare
|
Thanks for the detailed feedback. I really appreciate it. |
catalog.py.-.opensre.-.Visual.Studio.Code.2026-04-21.15-10-14.mp4 |
build_prompt.py.-.opensre.-.Visual.Studio.Code.2026-04-21.16-10-51.1.mp4 |
|
and pls add docs or take it up as separate PR #1020 |
|
Greptile shows stale comments even after fixes I’ve addressed the issues. I’m also preparing the docs and will include them shortly @muddlebee |
|
@cerencamkiran ready to merge then? |
|
Added Airflow integration docs @muddlebee! Happy to iterate further if needed. |
| seeded.append("list_openclaw_tools") | ||
|
|
||
| if "airflow" in available_sources: | ||
| seeded.append("get_recent_airflow_failures") |
There was a problem hiding this comment.
These three names are not registered as @tool callables anywhere in app/tools/. The existing TracerAirflowMetricsTool only registers get_airflow_metrics. Seeding names the registry doesn't know about means the LLM planner will be directed toward them (especially via the priority hint in build_prompt.py) but execution will silently skip or error out on them. The helpers in app/integrations/airflow.py need corresponding @tool-decorated wrappers under app/tools/ before these seeds are useful.
| class AirflowConfig(BaseModel): | ||
| """Normalized Airflow connection settings.""" | ||
|
|
||
| base_url: str = DEFAULT_AIRFLOW_BASE_URL |
There was a problem hiding this comment.
AirflowConfig should extend StrictConfigModel (not BaseModel) to match the project convention — see BetterStackConfig, for example. StrictConfigModel already provides the string-stripping behaviour you've manually re-implemented in _normalize_str, so that validator can be removed.
| return str(value or "").strip() | ||
|
|
||
| @property | ||
| def api_base_url(self) -> str: |
There was a problem hiding this comment.
api_base_url always equals base_url: _normalize_base_url already strips the trailing slash, so this property is a no-op. Use config.base_url directly in _request_json and remove the property.
| return payload if isinstance(payload, dict) else {} | ||
|
|
||
|
|
||
| def get_airflow_dags( |
There was a problem hiding this comment.
get_airflow_dags is never imported or called anywhere in this PR. If it's not going to be used via a tool wrapper, remove it. Dead exported functions make the module's surface area misleading.
| "pool": task_instance.get("pool", ""), | ||
| "queue": task_instance.get("queue", ""), | ||
| "priority_weight": task_instance.get("priority_weight"), | ||
| "note": "; ".join(note_parts), |
There was a problem hiding this comment.
The note field concatenates task_state, try_number, max_tries, and dag_run_state — all of which are already present as top-level fields in the same dict. This is redundant and adds noise to the LLM context. Drop it, or keep it only if there's a downstream consumer that specifically reads the note string.
| dag_id=dag_id, | ||
| dag_run_id=dag_run_id, | ||
| ) | ||
| except Exception: |
There was a problem hiding this comment.
Silent swallow hides real problems. Auth failures, connection errors, or malformed responses on any individual run will be swallowed here with no signal. At minimum: log a warning with the dag_run_id and the exception type before continuing. Otherwise a misconfigured integration looks identical to one with zero task failures.
| **airflow_config.model_dump(), | ||
| "integration_id": record_id, | ||
| }, "airflow" | ||
| return None, None # ← eksikti |
There was a problem hiding this comment.
Turkish inline comment ('eksikti' = 'was missing'). Remove before merge.
| "integration_id": record_id, | ||
| }, "betterstack" | ||
| return None, None | ||
| return None, None # ← eksikti |
There was a problem hiding this comment.
Same stray Turkish comment as line 599. Remove.
|
|
||
| for alert in raw_alert.get("alerts", []): | ||
| annotations = alert.setdefault("annotations", {}) | ||
| annotations["airflow_base_url"] = base_url |
There was a problem hiding this comment.
The Airflow integration is resolved from env vars / the integration store, not from alert annotations. Injecting airflow_base_url and airflow_dag_id here has no effect on how the agent picks up the integration — it comes from AIRFLOW_BASE_URL etc. already set in the env. This annotation injection is either a no-op or reflects a planned but not yet implemented annotation-based resolution path. Either way it should be clarified or removed to avoid confusion.
|
Thanks a lot for the detailed review @muddlebee. I’ve addressed all the other points (tool seeding, cleanup, logging, tests, etc.). |
|
Good catch @muddlebee.
Airflow tools are now correctly selected and invoked during the investigation flow. Note:
|
yashksaini-coder
left a comment
There was a problem hiding this comment.
Thorough local review — LGTM ✅
Checked out cerencamkiran/opensre:feature/airflow-integration, ran the full quality gate locally.
CI / Quality gate
| Check | Result |
|---|---|
make lint (ruff) |
✅ All checks passed |
make typecheck (mypy) |
✅ No new errors (2 pre-existing errors in clickhouse.py/kafka.py exist on main too) |
Unit + integration tests (-x -q --ignore=e2e) |
✅ 2962 passed, 1 skipped |
| Airflow-specific tests (13 tests) | ✅ All pass |
| GitHub CI checks | ✅ test, quality, typecheck, CodeQL all green |
Code review
app/integrations/airflow.py
- Clean separation: config model, env loader,
_request_jsontransport, and domain helpers are all well-scoped authproperty correctly gates basic auth behindnot self.auth_token— bearer token always wins ✅get_recent_airflow_failureswraps per-runget_airflow_task_instancesintry/exceptwithcontinue— previously raised review concern addressed ✅_to_failure_evidenceis thorough and investigation-friendly
app/tools/TracerAirflowDAGTool/__init__.py
- Proper
@toolwrappers for all three functions — tools are registered in the registry correctly extract_paramsbuilds config + dag_id from sources —get_airflow_task_instancesexcluded from auto-seed (requiresdag_run_id) — correct decisionis_availablechecks"airflow" in sources— clean gating
app/integrations/catalog.py
classify_integrationscorrectly populates the Airflow blockairflow_config_from_env()reused inload_env_integrations()— no duplication ✅"airflow"present indirect_servicestuple — integration wired end-to-end
app/nodes/plan_actions/
detect_sources.pycorrectly detects Airflow from alert contextbuild_prompt.pyadds Airflow-specific context to the investigation promptplan_actions.pyseedsget_recent_airflow_failuresandget_airflow_dag_runswhen Airflow is an available source
Tests
- 13 unit tests across integration helpers, routing, and catalog resolution — all well-structured, use
monkeypatchonhttpx.requestcleanly - E2E tests marked
@pytest.mark.e2e— CI-safe ✅ - Evidence from real failing DAG (
test_fail_dag) included via screen recording ✅
All previously raised review concerns (merge conflicts, missing try/except in failure loop, auth precedence, airflow_config_from_env deduplication, e2e marker, screen video) have been addressed.
Approving. Ready to merge. 🚀
|
💜 One more reason the project grows. Thanks @cerencamkiran — your contribution just landed! 👋 Join us on Discord - OpenSRE : hang out, contribute, or hunt for features and issues. Everyone's welcome. |
|
Looked at this carefully. Found one issue worth fixing before merge: P1: tools have source="tracer_web" instead of source="airflow". That's the SaaS webapp source. The prioritizer at prioritization.py:58 gives a +2 score bump when action.source in sources — for an Airflow incident, sources contains "airflow", so the bump never fires for these tools. The seeding mechanism keeps them in the pool, but they score 0 when they should score 2. Looks like the workaround was because EvidenceSource (in app/types/evidence.py) is a closed Literal and doesn't list "airflow". Easy fix: add "airflow" to the Literal, then update the three @tool(source=...) calls. P2: AirflowConfig extends BaseModel, not StrictConfigModel — @yashksaini-coder asked about this earlier; doesn't look like it got addressed. Most other integrations use StrictConfigModel to forbid extra fields. P2: no test for the exception-recovery path. airflow.py:282-294 carefully handles a single failing DAG-run-task-instances call so prior evidence isn't lost — but there's no test that exercises that path. Worth a "one fails, one succeeds, evidence preserved" test. P3, non-blocking: packaging/sync_release_version.py has an unrelated ruff-format reflow swept in. Nit: other tools set surfaces=("investigation", "chat"); these don't — intentional? Otherwise: catalog/env loading/resolve_effective_integrations follow the existing pattern, auth precedence is correct with a regression test, DAG IDs are URL-encoded, lint/format/mypy clean, 13 new tests + no regressions in the broad sweep (1916 passed locally). LGTM after the source fix. |
|
Bug: Wrong source on the tools. All three Airflow tools are tagged source="tracer_web" (which is the SaaS webapp), not source="airflow". This matters because the planner gives tools a +2 score boost when their source matches the incident — so on an Airflow incident, these tools get 0 boost when they should get 2. The seeding logic still gets them into the pool, but they could lose out to other tools when the budget is tight. Looks like the author hit this because EvidenceSource in app/types/evidence.py is a closed list and doesn't include "airflow". The fix is small: Add "airflow" to the EvidenceSource list AirflowConfig uses BaseModel instead of StrictConfigModel — @yashksaini-coder asked about this earlier and it doesn't look addressed. Most other integrations use the strict version. packaging/sync_release_version.py has an unrelated formatting tweak swept in. |
|
Thanks for the detailed review @hamzzaaamalik. On |
|
@cerencamkiran : yes but on the source, I'd still push for the fix here, it's small (one line in EvidenceSource + three source= swaps). The tracer_web workaround leaves the planner's audit trail wrong on Airflow incidents. StrictConfigModel: fine to defer. Mind opening a follow-up issue so it doesn't get lost? Other stuff (recovery test, surfaces=, the unrelated reflow) is non-blocking. |
|
Got it @hamzzaaamalik makes sense. I’ll add "airflow" to EvidenceSource and update the tool sources. |
|
Thanks! |
…tion-recovery test (#1077) * fix(airflow): correct tool source, use StrictConfigModel, add surfaces, test recovery path - Fix P1: change source='tracer_web' → source='airflow' on all 3 Airflow @tool decorators so the prioritizer's +2 score bump fires correctly when 'airflow' appears in detected sources (prioritization.py:58) - Add 'airflow' to EvidenceSource Literal in app/types/evidence.py - Fix P2: switch AirflowConfig from BaseModel → StrictConfigModel (extra='forbid') to match the rest of the integration config models - Add surfaces=('investigation', 'chat') to all 3 tools, matching the convention used by other integrations (PostgreSQL, MySQL, GitHub, etc.) - Add P2 test: test_get_recent_airflow_failures_partial_run_error_preserves_evidence covers the exception-recovery loop in get_recent_airflow_failures — verifies evidence from a successful run is preserved when another run's task-instance fetch raises HTTPStatusError Follows up on #570. Co-authored-by: Copilot <[email protected]> * fix(tests): correct search_logs assertions in test_datadog_client search_logs() returns {success, logs, total} directly — not a nested {logs: {success, logs}, monitors, events} shape like fetch_all() does. Three tests were written against the wrong shape: - test_search_logs_success: removed monitors/events assertions, fixed result['logs'][0]['message'] (was result['logs']['logs'][0]['message']) - test_search_logs_empty_data: fixed result['logs'] == [] (was result['logs']['logs']), removed incorrect monitors/events checks - test_search_logs_http_error: fixed result['error'] (was result['logs']['error']), removed unnecessary mock_instance.get stub All 19 tests in test_datadog_client.py now pass. Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>






Fixes #330
Summary
Adds a robust Apache Airflow integration with end-to-end investigation support.
This PR wires Airflow evidence directly into the investigation flow. This includes:
What's included
End-to-end validation
Validated against a live Airflow instance:
test_fail_dag)Screen recording
A short screen recording is attached showing:
Notes
get_recent_airflow_failuresnow handles per-run API failures without dropping previously collected evidence@pytest.mark.e2eto align with CI conventionsAddressed all review feedback:
Happy to iterate further if anything is missing.