Skip to content

Commit e7188a5

Browse files
authored
Merge pull request #5531 from taskcluster/matt-boris/workersFilteringInDB
feat(db,api): filter workers in db, not in browser
2 parents 0730a80 + 5fba914 commit e7188a5

File tree

9 files changed

+205
-21
lines changed

9 files changed

+205
-21
lines changed

changelog/issue-5456.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
audience: general
2+
level: minor
3+
reference: issue 5456
4+
---
5+
This change adds more DB functions to allow for filtering in the DB based on worker state and quarantined status for the workers page. Previously, filtering would only happen on the initial page loaded from the DB if results were paginated. This should also speed up the workers page rendering when a filter is applied.

db/fns.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@
148148
* [`get_non_stopped_workers_quntil_providers`](#get_non_stopped_workers_quntil_providers)
149149
* [`get_queue_worker_with_wm_join`](#get_queue_worker_with_wm_join)
150150
* [`get_queue_workers_with_wm_join`](#get_queue_workers_with_wm_join)
151+
* [`get_queue_workers_with_wm_join_quarantined`](#get_queue_workers_with_wm_join_quarantined)
152+
* [`get_queue_workers_with_wm_join_state`](#get_queue_workers_with_wm_join_state)
151153
* [`get_task_queue_wm_2`](#get_task_queue_wm_2)
152154
* [`get_task_queues_wm`](#get_task_queues_wm)
153155
* [`get_worker_2`](#get_worker_2)
@@ -2344,6 +2346,8 @@ If the hashed session id does not exist, then an error code `P0002` will be thro
23442346
* [`get_non_stopped_workers_quntil_providers`](#get_non_stopped_workers_quntil_providers)
23452347
* [`get_queue_worker_with_wm_join`](#get_queue_worker_with_wm_join)
23462348
* [`get_queue_workers_with_wm_join`](#get_queue_workers_with_wm_join)
2349+
* [`get_queue_workers_with_wm_join_quarantined`](#get_queue_workers_with_wm_join_quarantined)
2350+
* [`get_queue_workers_with_wm_join_state`](#get_queue_workers_with_wm_join_state)
23472351
* [`get_task_queue_wm_2`](#get_task_queue_wm_2)
23482352
* [`get_task_queues_wm`](#get_task_queues_wm)
23492353
* [`get_worker_2`](#get_worker_2)
@@ -2572,6 +2576,63 @@ If the pagination arguments are both NULL, all rows are returned.
25722576
Otherwise, page_size rows are returned at offset page_offset.
25732577
This also performs an outer join with the worker_manager.worker table for more data.
25742578

2579+
### get_queue_workers_with_wm_join_quarantined
2580+
2581+
* *Mode*: read
2582+
* *Arguments*:
2583+
* `task_queue_id_in text`
2584+
* `page_size_in integer`
2585+
* `page_offset_in integer`
2586+
* *Returns*: `table`
2587+
* `worker_pool_id text`
2588+
* `worker_group text`
2589+
* `worker_id text`
2590+
* `quarantine_until timestamptz`
2591+
* `expires timestamptz`
2592+
* `first_claim timestamptz`
2593+
* `recent_tasks jsonb`
2594+
* `last_date_active timestamptz`
2595+
* `state text`
2596+
* `capacity int4`
2597+
* `provider_id text`
2598+
* `etag uuid`
2599+
* *Last defined on version*: 77
2600+
2601+
Get quarantined queue workers ordered by worker_pool_id, worker_group, and worker_id.
2602+
If the pagination arguments are both NULL, all rows are returned.
2603+
Otherwise, page_size rows are returned at offset page_offset.
2604+
This also performs an outer join with the worker_manager.worker table for more data.
2605+
2606+
### get_queue_workers_with_wm_join_state
2607+
2608+
* *Mode*: read
2609+
* *Arguments*:
2610+
* `task_queue_id_in text`
2611+
* `expires_in timestamptz`
2612+
* `page_size_in integer`
2613+
* `page_offset_in integer`
2614+
* `worker_state_in text`
2615+
* *Returns*: `table`
2616+
* `worker_pool_id text`
2617+
* `worker_group text`
2618+
* `worker_id text`
2619+
* `quarantine_until timestamptz`
2620+
* `expires timestamptz`
2621+
* `first_claim timestamptz`
2622+
* `recent_tasks jsonb`
2623+
* `last_date_active timestamptz`
2624+
* `state text`
2625+
* `capacity int4`
2626+
* `provider_id text`
2627+
* `etag uuid`
2628+
* *Last defined on version*: 77
2629+
2630+
Get non-expired queue workers by state ordered by worker_pool_id, worker_group, and worker_id.
2631+
Workers are not considered expired until after their quarantine date expires.
2632+
If the pagination arguments are both NULL, all rows are returned.
2633+
Otherwise, page_size rows are returned at offset page_offset.
2634+
This also performs an outer join with the worker_manager.worker table for more data.
2635+
25752636
### get_task_queue_wm_2
25762637

25772638
* *Mode*: read

db/test/fns/queue_test.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,16 @@ suite(testing.suiteName(), function() {
11541154
assert.equal(results[5].worker_pool_id, 'prov/w/5');
11551155
});
11561156

1157+
helper.dbTest('get_queue_workers_with_wm_join_state empty', async function(db) {
1158+
const res = await db.fns.get_queue_workers_with_wm_join_state(null, null, null, null, null);
1159+
assert.deepEqual(res, []);
1160+
});
1161+
1162+
helper.dbTest('get_queue_workers_with_wm_join_quarantined empty', async function(db) {
1163+
const res = await db.fns.get_queue_workers_with_wm_join_quarantined(null, null, null);
1164+
assert.deepEqual(res, []);
1165+
});
1166+
11571167
helper.dbTest('update_queue_worker_tqid (deprecated)', async function(db) {
11581168
await create(db);
11591169
const res = await db.deprecatedFns.update_queue_worker_tqid(

db/test/versions/0077_test.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
const testing = require('taskcluster-lib-testing');
2+
3+
suite(testing.suiteName(), function() {
4+
// this version only updates method,
5+
// to trigger index usage, so no tests are needed
6+
});

db/versions/0077.yml

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
version: 77
2+
description: create additional functions to filter workers on state and quarantined status
3+
methods:
4+
get_queue_workers_with_wm_join_state:
5+
description: |-
6+
Get non-expired queue workers by state ordered by worker_pool_id, worker_group, and worker_id.
7+
Workers are not considered expired until after their quarantine date expires.
8+
If the pagination arguments are both NULL, all rows are returned.
9+
Otherwise, page_size rows are returned at offset page_offset.
10+
This also performs an outer join with the worker_manager.worker table for more data.
11+
mode: read
12+
serviceName: worker_manager
13+
args: task_queue_id_in text, expires_in timestamptz, page_size_in integer, page_offset_in integer, worker_state_in text
14+
returns: table(worker_pool_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, state text, capacity int4, provider_id text, etag uuid)
15+
body: |-
16+
begin
17+
return query
18+
select
19+
queue_workers.task_queue_id as worker_pool_id,
20+
queue_workers.worker_group,
21+
queue_workers.worker_id,
22+
queue_workers.quarantine_until,
23+
queue_workers.expires,
24+
queue_workers.first_claim,
25+
queue_workers.recent_tasks,
26+
queue_workers.last_date_active,
27+
workers.state,
28+
workers.capacity,
29+
workers.provider_id,
30+
public.gen_random_uuid()
31+
from queue_workers
32+
full outer join workers on workers.worker_id = queue_workers.worker_id
33+
where
34+
workers.state = worker_state_in and
35+
(queue_workers.task_queue_id = task_queue_id_in or get_queue_workers_with_wm_join_state.task_queue_id_in is null) and
36+
((queue_workers.expires > expires_in and queue_workers.quarantine_until < expires_in) or get_queue_workers_with_wm_join_state.expires_in is null)
37+
order by worker_pool_id, worker_group, worker_id
38+
limit get_page_limit(page_size_in)
39+
offset get_page_offset(page_offset_in);
40+
end
41+
get_queue_workers_with_wm_join_quarantined:
42+
description: |-
43+
Get quarantined queue workers ordered by worker_pool_id, worker_group, and worker_id.
44+
If the pagination arguments are both NULL, all rows are returned.
45+
Otherwise, page_size rows are returned at offset page_offset.
46+
This also performs an outer join with the worker_manager.worker table for more data.
47+
mode: read
48+
serviceName: worker_manager
49+
args: task_queue_id_in text, page_size_in integer, page_offset_in integer
50+
returns: table(worker_pool_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, state text, capacity int4, provider_id text, etag uuid)
51+
body: |-
52+
begin
53+
return query
54+
select
55+
queue_workers.task_queue_id as worker_pool_id,
56+
queue_workers.worker_group,
57+
queue_workers.worker_id,
58+
queue_workers.quarantine_until,
59+
queue_workers.expires,
60+
queue_workers.first_claim,
61+
queue_workers.recent_tasks,
62+
queue_workers.last_date_active,
63+
workers.state,
64+
workers.capacity,
65+
workers.provider_id,
66+
public.gen_random_uuid()
67+
from queue_workers
68+
full outer join workers on workers.worker_id = queue_workers.worker_id
69+
where
70+
(queue_workers.task_queue_id = task_queue_id_in or get_queue_workers_with_wm_join_quarantined.task_queue_id_in is null) and
71+
(queue_workers.expires >= now() and queue_workers.quarantine_until >= now())
72+
order by worker_pool_id, worker_group, worker_id
73+
limit get_page_limit(page_size_in)
74+
offset get_page_offset(page_offset_in);
75+
end

db/versions/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,5 @@ If this invariant is violated, it must be indicated clearly in the changelog.
8686
| [0074](./0074.yml) | v44.15.3 | get task queue(s) via worker manager service |
8787
| [0075](./0075.yml) | v44.15.4 | fix get_task_queue_wm args |
8888
| [0076](./0076.yml) | v44.16.0 | get github check by check suite id and check run id |
89+
| [0077](./0077.yml) | (pending release) | create additional functions to filter workers on state and quarantined status |
8990
<!-- AUTOGENERATED DO NOT EDIT - END -->

generated/db-schema.json

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/worker-manager/src/api.js

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -868,10 +868,7 @@ builder.declare({
868868
'page. You may limit this with the query-string parameter `limit`.',
869869
].join('\n'),
870870
}, async function(req, res) {
871-
const quarantined = req.query.quarantined || null;
872-
const workerState = req.query.workerState || null;
873871
const { provisionerId, workerType } = req.params;
874-
const now = new Date();
875872
const workerPoolId = joinWorkerPoolId(provisionerId, workerType);
876873

877874
const { rows: workers, continuationToken } = await Worker.getWorkers(
@@ -881,21 +878,7 @@ builder.declare({
881878
);
882879

883880
const result = {
884-
workers: workers.filter(worker => {
885-
let quarantineFilter = true;
886-
if (quarantined === 'true') {
887-
quarantineFilter = worker.quarantineUntil >= now;
888-
} else if (quarantined === 'false') {
889-
quarantineFilter = worker.quarantineUntil < now;
890-
}
891-
// filter out anything that is both expired and not quarantined
892-
// so that quarantined workers remain visible even after expiration
893-
return (
894-
(worker.expires >= now || worker.quarantineUntil >= now) &&
895-
quarantineFilter &&
896-
(workerState ? worker.state === workerState : true)
897-
);
898-
}).map(worker => {
881+
workers: workers.map(worker => {
899882
let entry = {
900883
workerGroup: worker.workerGroup,
901884
workerId: worker.workerId,

services/worker-manager/src/data.js

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,32 @@ class Worker {
337337
const fetchResults = async (query) => {
338338
const { continuationToken, rows } = await paginateResults({
339339
query,
340-
fetch: (size, offset) =>
341-
db.fns.get_queue_workers_with_wm_join(
340+
fetch: (size, offset) => {
341+
if (Object.keys(query).includes('workerState')) {
342+
return db.fns.get_queue_workers_with_wm_join_state(
343+
workerPoolId || null,
344+
expires || null,
345+
size,
346+
offset,
347+
query.workerState,
348+
);
349+
}
350+
351+
if (query.quarantined === 'true') {
352+
return db.fns.get_queue_workers_with_wm_join_quarantined(
353+
workerPoolId || null,
354+
size,
355+
offset,
356+
);
357+
}
358+
359+
return db.fns.get_queue_workers_with_wm_join(
342360
workerPoolId || null,
343361
expires || null,
344362
size,
345363
offset,
346-
),
364+
);
365+
},
347366
});
348367

349368
const entries = rows.map(Worker.fromDb);

0 commit comments

Comments
 (0)