Skip to content

Commit 63ee3d1

Browse files
committed
Handle BranchPythonOperator
1 parent f171d0a commit 63ee3d1

4 files changed

Lines changed: 248 additions & 58 deletions

File tree

crates/ruff_linter/resources/test/fixtures/airflow/AIR003.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
from airflow.decorators import task
2+
from airflow.operators.python import BranchPythonOperator, ShortCircuitOperator
3+
from airflow.providers.standard.operators.python import (
4+
BranchPythonOperator as ProviderBranchPythonOperator,
5+
)
26

37
condition1 = True
48
condition2 = True
@@ -89,3 +93,53 @@ def already_short_circuit():
8993
if condition1:
9094
return True
9195
return False
96+
97+
98+
# BranchPythonOperator violations (should trigger AIR003):
99+
100+
101+
def operator_short_circuit_candidate():
102+
if condition1:
103+
return ["downstream_task"]
104+
return []
105+
106+
107+
BranchPythonOperator(task_id="task", python_callable=operator_short_circuit_candidate) # AIR003
108+
109+
110+
def provider_short_circuit_candidate():
111+
if condition1:
112+
return ["downstream_task"]
113+
return []
114+
115+
116+
ProviderBranchPythonOperator( # AIR003
117+
task_id="task", python_callable=provider_short_circuit_candidate
118+
)
119+
120+
121+
# BranchPythonOperator non-violations:
122+
123+
124+
def operator_two_non_empty():
125+
if condition1:
126+
return ["task_a"]
127+
if condition2:
128+
return ["task_b"]
129+
return []
130+
131+
132+
BranchPythonOperator(task_id="task", python_callable=operator_two_non_empty)
133+
134+
135+
def operator_single_return():
136+
return ["downstream_task"]
137+
138+
139+
BranchPythonOperator(task_id="task", python_callable=operator_single_return)
140+
141+
ShortCircuitOperator(task_id="task", python_callable=operator_short_circuit_candidate)
142+
143+
BranchPythonOperator(task_id="task", python_callable=lambda: ["downstream_task"])
144+
145+
BranchPythonOperator(task_id="task")

crates/ruff_linter/src/checkers/ast/analyze/expression.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,9 @@ pub(crate) fn expression(expr: &Expr, checker: &Checker) {
12841284
if checker.is_rule_enabled(Rule::Airflow3IncompatibleFunctionSignature) {
12851285
airflow::rules::airflow_3_incompatible_function_signature(checker, expr);
12861286
}
1287+
if checker.is_rule_enabled(Rule::TaskBranchAsShortCircuit) {
1288+
airflow::rules::branch_python_operator_as_short_circuit(checker, call);
1289+
}
12871290
if checker.is_rule_enabled(Rule::UnnecessaryCastToInt) {
12881291
ruff::rules::unnecessary_cast_to_int(checker, call);
12891292
}

crates/ruff_linter/src/rules/airflow/rules/task_branch_as_short_circuit.rs

Lines changed: 133 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
11
use ruff_macros::{ViolationMetadata, derive_message_formats};
22
use ruff_python_ast::helpers::ReturnStatementVisitor;
33
use ruff_python_ast::visitor::Visitor;
4-
use ruff_python_ast::{self as ast, Expr, StmtFunctionDef};
5-
use ruff_python_semantic::Modules;
4+
use ruff_python_ast::{self as ast, Expr, ExprCall, Stmt, StmtFunctionDef};
5+
use ruff_python_semantic::{BindingKind, Modules, ScopeKind};
66
use ruff_text_size::Ranged;
77

88
use crate::Violation;
99
use crate::checkers::ast::Checker;
1010
use crate::rules::airflow::helpers::is_airflow_task_variant;
1111

1212
/// ## What it does
13-
/// Checks for `@task.branch` decorated functions that could be replaced
14-
/// with `@task.short_circuit`.
13+
/// Checks for branching logic that could be replaced with a short-circuit
14+
/// pattern, either via `@task.branch` decorated functions or
15+
/// `BranchPythonOperator` callables.
1516
///
1617
/// ## Why is this bad?
17-
/// When a `@task.branch` function has at least two `return` statements and
18-
/// exactly one of them returns a non-empty list, the function is effectively
19-
/// acting as a short-circuit operator. Using `@task.short_circuit` is
20-
/// simpler and more readable in such cases.
18+
/// When a branch function has at least two `return` statements and exactly
19+
/// one of them returns a non-empty list, the function is effectively acting
20+
/// as a short-circuit operator. Using `@task.short_circuit` or
21+
/// `ShortCircuitOperator` is simpler and more readable in such cases.
2122
///
2223
/// ## Example
24+
///
25+
/// Using the `TaskFlow` API:
2326
/// ```python
2427
/// from airflow.decorators import task
2528
///
@@ -40,18 +43,60 @@ use crate::rules::airflow::helpers::is_airflow_task_variant;
4043
/// def my_task():
4144
/// return condition
4245
/// ```
46+
///
47+
/// Using the standard operator API:
48+
/// ```python
49+
/// from airflow.operators.python import BranchPythonOperator
50+
///
51+
///
52+
/// def my_callable():
53+
/// if condition:
54+
/// return ["my_downstream_task"]
55+
/// return []
56+
///
57+
///
58+
/// task = BranchPythonOperator(task_id="my_task", python_callable=my_callable)
59+
/// ```
60+
///
61+
/// Use instead:
62+
/// ```python
63+
/// from airflow.operators.python import ShortCircuitOperator
64+
///
65+
///
66+
/// def my_callable():
67+
/// return condition
68+
///
69+
///
70+
/// task = ShortCircuitOperator(task_id="my_task", python_callable=my_callable)
71+
/// ```
4372
#[derive(ViolationMetadata)]
4473
#[violation_metadata(preview_since = "NEXT_RUFF_VERSION")]
45-
pub(crate) struct TaskBranchAsShortCircuit;
74+
pub(crate) struct TaskBranchAsShortCircuit {
75+
kind: BranchKind,
76+
}
77+
78+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79+
enum BranchKind {
80+
Decorator,
81+
Operator,
82+
}
4683

4784
impl Violation for TaskBranchAsShortCircuit {
4885
#[derive_message_formats]
4986
fn message(&self) -> String {
50-
"A `@task.branch` that can be replaced with `@task.short_circuit`".to_string()
87+
match self.kind {
88+
BranchKind::Decorator => {
89+
"A `@task.branch` that can be replaced with `@task.short_circuit`".to_string()
90+
}
91+
BranchKind::Operator => {
92+
"A `BranchPythonOperator` that can be replaced with `ShortCircuitOperator`"
93+
.to_string()
94+
}
95+
}
5196
}
5297
}
5398

54-
/// AIR003
99+
/// AIR003 (decorator form)
55100
pub(crate) fn task_branch_as_short_circuit(checker: &Checker, function_def: &StmtFunctionDef) {
56101
if !checker.semantic().seen_module(Modules::AIRFLOW) {
57102
return;
@@ -61,14 +106,88 @@ pub(crate) fn task_branch_as_short_circuit(checker: &Checker, function_def: &Stm
61106
return;
62107
}
63108

109+
if could_be_short_circuit(&function_def.body) {
110+
checker.report_diagnostic(
111+
TaskBranchAsShortCircuit {
112+
kind: BranchKind::Decorator,
113+
},
114+
function_def.range(),
115+
);
116+
}
117+
}
118+
119+
/// AIR003 (operator form)
120+
pub(crate) fn branch_python_operator_as_short_circuit(checker: &Checker, call: &ExprCall) {
121+
if !checker.semantic().seen_module(Modules::AIRFLOW) {
122+
return;
123+
}
124+
125+
let semantic = checker.semantic();
126+
127+
let Some(qualified_name) = semantic.resolve_qualified_name(&call.func) else {
128+
return;
129+
};
130+
131+
if !matches!(
132+
qualified_name.segments(),
133+
[
134+
"airflow",
135+
"operators",
136+
"python" | "python_operator",
137+
"BranchPythonOperator"
138+
] | [
139+
"airflow",
140+
"providers",
141+
"standard",
142+
"operators",
143+
"python",
144+
"BranchPythonOperator"
145+
]
146+
) {
147+
return;
148+
}
149+
150+
let Some(keyword) = call.arguments.find_keyword("python_callable") else {
151+
return;
152+
};
153+
154+
let Expr::Name(name_expr) = &keyword.value else {
155+
return;
156+
};
157+
158+
let Some(binding_id) = semantic.only_binding(name_expr) else {
159+
return;
160+
};
161+
162+
let BindingKind::FunctionDefinition(scope_id) = semantic.binding(binding_id).kind else {
163+
return;
164+
};
165+
166+
let ScopeKind::Function(function_def) = semantic.scopes[scope_id].kind else {
167+
return;
168+
};
169+
170+
if could_be_short_circuit(&function_def.body) {
171+
checker.report_diagnostic(
172+
TaskBranchAsShortCircuit {
173+
kind: BranchKind::Operator,
174+
},
175+
call.func.range(),
176+
);
177+
}
178+
}
179+
180+
/// Returns `true` if the function body has 2+ return statements with exactly
181+
/// one non-empty list return — indicating a short-circuit pattern.
182+
fn could_be_short_circuit(body: &[Stmt]) -> bool {
64183
let mut visitor = ReturnStatementVisitor::default();
65-
for stmt in &function_def.body {
184+
for stmt in body {
66185
visitor.visit_stmt(stmt);
67186
}
68187

69188
let returns = &visitor.returns;
70189
if returns.len() < 2 {
71-
return;
190+
return false;
72191
}
73192

74193
let non_empty_list_count = returns
@@ -80,7 +199,5 @@ pub(crate) fn task_branch_as_short_circuit(checker: &Checker, function_def: &Stm
80199
})
81200
.count();
82201

83-
if non_empty_list_count == 1 {
84-
checker.report_diagnostic(TaskBranchAsShortCircuit, function_def.range());
85-
}
202+
non_empty_list_count == 1
86203
}

0 commit comments

Comments
 (0)