Skip to content

Commit 70a050b

Browse files
Usieluranusjr
andauthored
Enable pools to consider deferred tasks (#32709)
* Makes pools respect deferrable tasks (with extra setting) See #21243 This commit makes pools consider deferred tasks if the `include_deferred` flag is set. By default a pool will not consider deferred tasks as occupied slots, but still show the number of deferred tasks in its stats. --------- Co-authored-by: Tzu-ping Chung <[email protected]>
1 parent f82acc1 commit 70a050b

File tree

37 files changed

+549
-166
lines changed

37 files changed

+549
-166
lines changed

airflow/api/client/api_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,13 @@ def get_pools(self):
6060
"""Get all pools."""
6161
raise NotImplementedError()
6262

63-
def create_pool(self, name, slots, description):
63+
def create_pool(self, name, slots, description, include_deferred):
6464
"""Create a pool.
6565
6666
:param name: pool name
6767
:param slots: pool slots amount
6868
:param description: pool description
69+
:param include_deferred: include deferred tasks in pool calculations
6970
"""
7071
raise NotImplementedError()
7172

airflow/api/client/json_client.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,26 @@ def get_pools(self):
108108
pools = self._request(url)
109109
return [(p["pool"], p["slots"], p["description"]) for p in pools]
110110

111-
def create_pool(self, name: str, slots: int, description: str):
111+
def create_pool(self, name: str, slots: int, description: str, include_deferred: bool):
112112
"""Create a new pool.
113113
114114
:param name: The name of the pool to create.
115115
:param slots: The number of slots in the pool.
116116
:param description: A description of the pool.
117+
:param include_deferred: include deferred tasks in pool calculations
118+
117119
:return: A tuple containing the name of the pool, the number of slots in the pool,
118-
and a description of the pool.
120+
a description of the pool and the include_deferred flag.
119121
"""
120122
endpoint = "/api/experimental/pools"
121123
data = {
122124
"name": name,
123125
"slots": slots,
124126
"description": description,
127+
"include_deferred": include_deferred,
125128
}
126129
response = self._request(urljoin(self._api_base_url, endpoint), method="POST", json=data)
127-
return response["pool"], response["slots"], response["description"]
130+
return response["pool"], response["slots"], response["description"], response["include_deferred"]
128131

129132
def delete_pool(self, name: str):
130133
"""Delete a pool.

airflow/api/client/local_client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ def get_pool(self, name):
6363
pool = Pool.get_pool(pool_name=name)
6464
if not pool:
6565
raise PoolNotFound(f"Pool {name} not found")
66-
return pool.pool, pool.slots, pool.description
66+
return pool.pool, pool.slots, pool.description, pool.include_deferred
6767

6868
def get_pools(self):
69-
return [(p.pool, p.slots, p.description) for p in Pool.get_pools()]
69+
return [(p.pool, p.slots, p.description, p.include_deferred) for p in Pool.get_pools()]
7070

71-
def create_pool(self, name, slots, description):
71+
def create_pool(self, name, slots, description, include_deferred):
7272
if not (name and name.strip()):
7373
raise AirflowBadRequest("Pool name shouldn't be empty")
7474
pool_name_length = Pool.pool.property.columns[0].type.length
@@ -78,7 +78,9 @@ def create_pool(self, name, slots, description):
7878
slots = int(slots)
7979
except ValueError:
8080
raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
81-
pool = Pool.create_or_update_pool(name=name, slots=slots, description=description)
81+
pool = Pool.create_or_update_pool(
82+
name=name, slots=slots, description=description, include_deferred=include_deferred
83+
)
8284
return pool.pool, pool.slots, pool.description
8385

8486
def delete_pool(self, name):

airflow/api/common/experimental/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def create_pool(name, slots, description, session: Session = NEW_SESSION):
6868
session.expire_on_commit = False
6969
pool = session.scalar(select(Pool).filter_by(pool=name).limit(1))
7070
if pool is None:
71-
pool = Pool(pool=name, slots=slots, description=description)
71+
pool = Pool(pool=name, slots=slots, description=description, include_deferred=False)
7272
session.add(pool)
7373
else:
7474
pool.slots = slots

airflow/api_connexion/endpoints/pool_endpoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ def patch_pool(
8888
) -> APIResponse:
8989
"""Update a pool."""
9090
request_dict = get_json_request_dict()
91-
# Only slots can be modified in 'default_pool'
91+
# Only slots and include_deferred can be modified in 'default_pool'
9292
try:
9393
if pool_name == Pool.DEFAULT_POOL_NAME and request_dict["name"] != Pool.DEFAULT_POOL_NAME:
94-
if update_mask and len(update_mask) == 1 and update_mask[0].strip() == "slots":
94+
if update_mask and all(mask.strip() in {"slots", "include_deferred"} for mask in update_mask):
9595
pass
9696
else:
9797
raise BadRequest(detail="Default Pool's name can't be modified")

airflow/api_connexion/openapi/v1.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3158,7 +3158,7 @@ components:
31583158
occupied_slots:
31593159
type: integer
31603160
readOnly: true
3161-
description: The number of slots used by running/queued tasks at the moment.
3161+
description: The number of slots used by running/queued tasks at the moment. May include deferred tasks if 'include_deferred' is set to true.
31623162
running_slots:
31633163
type: integer
31643164
readOnly: true
@@ -3175,13 +3175,26 @@ components:
31753175
type: integer
31763176
readOnly: true
31773177
description: The number of slots used by scheduled tasks at the moment.
3178+
deferred_slots:
3179+
type: integer
3180+
readOnly: true
3181+
description: |
3182+
The number of slots used by deferred tasks at the moment. Relevant if 'include_deferred' is set to true.
3183+
3184+
*New in version 2.7.0*
31783185
description:
31793186
type: string
31803187
description: |
31813188
The description of the pool.
31823189
31833190
*New in version 2.3.0*
31843191
nullable: true
3192+
include_deferred:
3193+
type: boolean
3194+
description: |
3195+
If set to true, deferred tasks are considered when calculating open pool slots.
3196+
3197+
*New in version 2.7.0*
31853198
31863199
PoolCollection:
31873200
type: object

airflow/api_connexion/schemas/pool_schema.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ class Meta:
3939
queued_slots = fields.Method("get_queued_slots", dump_only=True)
4040
scheduled_slots = fields.Method("get_scheduled_slots", dump_only=True)
4141
open_slots = fields.Method("get_open_slots", dump_only=True)
42+
deferred_slots = fields.Method("get_deferred_slots", dump_only=True)
4243
description = auto_field()
44+
# we skip auto_field() here to be compatible with the manual validation in the pool_endpoint module
45+
include_deferred = fields.Boolean(load_default=False)
4346

4447
@staticmethod
4548
def get_occupied_slots(obj: Pool) -> int:
@@ -61,6 +64,11 @@ def get_scheduled_slots(obj: Pool) -> int:
6164
"""Returns the scheduled slots of the pool."""
6265
return obj.scheduled_slots()
6366

67+
@staticmethod
68+
def get_deferred_slots(obj: Pool) -> int:
69+
"""Returns the deferred slots of the pool."""
70+
return obj.deferred_slots()
71+
6472
@staticmethod
6573
def get_open_slots(obj: Pool) -> float:
6674
"""Returns the open slots of the pool."""

airflow/cli/cli_config.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ def string_lower_type(val):
513513
ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
514514
ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
515515
ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
516+
ARG_POOL_INCLUDE_DEFERRED = Arg(
517+
("--include-deferred",), help="Include deferred tasks in calculations for Pool", action="store_true"
518+
)
516519
ARG_POOL_IMPORT = Arg(
517520
("file",),
518521
metavar="FILEPATH",
@@ -521,8 +524,8 @@ def string_lower_type(val):
521524
textwrap.dedent(
522525
"""
523526
{
524-
"pool_1": {"slots": 5, "description": ""},
525-
"pool_2": {"slots": 10, "description": "test"}
527+
"pool_1": {"slots": 5, "description": "", "include_deferred": true},
528+
"pool_2": {"slots": 10, "description": "test", "include_deferred": false}
526529
}"""
527530
),
528531
" " * 4,
@@ -1456,7 +1459,14 @@ class GroupCommand(NamedTuple):
14561459
name="set",
14571460
help="Configure pool",
14581461
func=lazy_load_command("airflow.cli.commands.pool_command.pool_set"),
1459-
args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT, ARG_VERBOSE),
1462+
args=(
1463+
ARG_POOL_NAME,
1464+
ARG_POOL_SLOTS,
1465+
ARG_POOL_DESCRIPTION,
1466+
ARG_POOL_INCLUDE_DEFERRED,
1467+
ARG_OUTPUT,
1468+
ARG_VERBOSE,
1469+
),
14601470
),
14611471
ActionCommand(
14621472
name="delete",

airflow/cli/commands/pool_command.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def _show_pools(pools, output):
3838
"pool": x[0],
3939
"slots": x[1],
4040
"description": x[2],
41+
"include_deferred": x[3],
4142
},
4243
)
4344

@@ -69,7 +70,9 @@ def pool_get(args):
6970
def pool_set(args):
7071
"""Creates new pool with a given name and slots."""
7172
api_client = get_current_api_client()
72-
api_client.create_pool(name=args.pool, slots=args.slots, description=args.description)
73+
api_client.create_pool(
74+
name=args.pool, slots=args.slots, description=args.description, include_deferred=args.include_deferred
75+
)
7376
print(f"Pool {args.pool} created")
7477

7578

@@ -119,8 +122,15 @@ def pool_import_helper(filepath):
119122
pools = []
120123
failed = []
121124
for k, v in pools_json.items():
122-
if isinstance(v, dict) and len(v) == 2:
123-
pools.append(api_client.create_pool(name=k, slots=v["slots"], description=v["description"]))
125+
if isinstance(v, dict) and "slots" in v and "description" in v:
126+
pools.append(
127+
api_client.create_pool(
128+
name=k,
129+
slots=v["slots"],
130+
description=v["description"],
131+
include_deferred=v.get("include_deferred", False),
132+
)
133+
)
124134
else:
125135
failed.append(k)
126136
return pools, failed
@@ -132,7 +142,7 @@ def pool_export_helper(filepath):
132142
pool_dict = {}
133143
pools = api_client.get_pools()
134144
for pool in pools:
135-
pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2]}
145+
pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2], "include_deferred": pool[3]}
136146
with open(filepath, "w") as poolfile:
137147
poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
138148
return pools

airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,10 +1571,12 @@ def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
15711571
Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
15721572
Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
15731573
Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"])
1574+
Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"])
15741575
# Same metrics with tagging
15751576
Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name})
15761577
Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name})
15771578
Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name})
1579+
Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name})
15781580

15791581
@provide_session
15801582
def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:

0 commit comments

Comments
 (0)