|
23 | 23 | ExecuteSuccessEvent, |
24 | 24 | Instruction, |
25 | 25 | RecommendationsConflict, |
| 26 | + RefreshWhoHasEvent, |
26 | 27 | ReleaseWorkerDataMsg, |
27 | 28 | RescheduleEvent, |
28 | 29 | RescheduleMsg, |
@@ -603,3 +604,46 @@ async def test_missing_to_waiting(c, s, w1, w2, w3): |
603 | 604 | await w1.close() |
604 | 605 |
|
605 | 606 | await f1 |
| 607 | + |
| 608 | + |
| 609 | +@gen_cluster(client=True, nthreads=[("", 1)] * 3) |
| 610 | +async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3): |
| 611 | + """ |
| 612 | + 1. Two tasks, x and y, are only available on a busy worker. |
| 613 | + The worker sends request-refresh-who-has to the scheduler. |
| 614 | + 2. The scheduler responds that x has become missing, while y has gained an |
| 615 | + additional replica |
| 616 | + 3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition |
| 617 | + to missing. |
| 618 | + 5. Before the recommendation can be implemented, the same event invokes |
| 619 | + _ensure_communicating to let y to transition to flight. This in turn pops x from |
| 620 | + data_needed - but x has an empty who_has, which is an exceptional situation. |
| 621 | + 6. The transition fetch->missing is executed, but x is no longer in |
| 622 | + data_needed - another exceptional situation. |
| 623 | + """ |
| 624 | + x = c.submit(inc, 1, key="x", workers=[w1.address]) |
| 625 | + y = c.submit(inc, 2, key="y", workers=[w1.address]) |
| 626 | + await wait([x, y]) |
| 627 | + w1.total_in_connections = 0 |
| 628 | + s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1") |
| 629 | + |
| 630 | + # The tasks will now flip-flop between fetch and flight every 150ms |
| 631 | + # (see Worker.retry_busy_worker_later) |
| 632 | + await wait_for_state("x", "fetch", w3) |
| 633 | + await wait_for_state("y", "fetch", w3) |
| 634 | + assert w1.address in w3.busy_workers |
| 635 | + # w3 sent {op: request-refresh-who-has, keys: [x, y]} |
| 636 | + # There also may have been enough time for a refresh-who-has message to come back, |
| 637 | + # which reiterated what the w3 already knew: |
| 638 | + # {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}} |
| 639 | + |
| 640 | + # Let's instead simulate that, while request-refresh-who-has was in transit, |
| 641 | + # w2 gained a replica of y and then subsequently w1 closed down. |
| 642 | + # When request-refresh-who-has lands, the scheduler will respond: |
| 643 | + # {op: refresh-who-has, who_has={x: [], y: [w2.address]}} |
| 644 | + w3.handle_stimulus( |
| 645 | + RefreshWhoHasEvent(who_has={"x": {}, "y": {w2.address}}, stimulus_id="test3") |
| 646 | + ) |
| 647 | + assert w3.tasks["x"].state == "missing" |
| 648 | + assert w3.tasks["y"].state == "flight" |
| 649 | + assert w3.tasks["y"].who_has == {w2.address} |
0 commit comments