Skip to content

Commit b31c3fc

Browse files
committed
Merge branch 'main' into WSMR/gather_dep
2 parents f5fb24a + 059798a commit b31c3fc

File tree

15 files changed

+270
-143
lines changed

15 files changed

+270
-143
lines changed

distributed/cli/tests/test_dask_scheduler.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_no_dashboard(loop):
6666
def test_dashboard(loop):
6767
pytest.importorskip("bokeh")
6868

69-
with popen(["dask-scheduler"], flush_output=False) as proc:
69+
with popen(["dask-scheduler"], capture_output=True) as proc:
7070
line = wait_for_log_line(b"dashboard at", proc.stdout)
7171
dashboard_port = int(line.decode().split(":")[-1].strip())
7272

@@ -99,24 +99,22 @@ def test_dashboard_non_standard_ports(loop):
9999
pytest.importorskip("bokeh")
100100

101101
with popen(
102-
["dask-scheduler", "--port", "23448", "--dashboard-address", ":24832"],
103-
flush_output=False,
102+
["dask-scheduler", "--port", "3448", "--dashboard-address", ":4832"]
104103
) as proc:
105-
line = wait_for_log_line(b"dashboard at", proc.stdout)
106-
with Client("127.0.0.1:23448", loop=loop) as c:
104+
with Client("127.0.0.1:3448", loop=loop) as c:
107105
pass
108106

109107
start = time()
110108
while True:
111109
try:
112-
response = requests.get("http://localhost:24832/status/")
110+
response = requests.get("http://localhost:4832/status/")
113111
assert response.ok
114112
break
115113
except Exception:
116114
sleep(0.1)
117115
assert time() < start + 20
118116
with pytest.raises(Exception):
119-
requests.get("http://localhost:24832/status/")
117+
requests.get("http://localhost:4832/status/")
120118

121119

122120
@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@@ -209,10 +207,8 @@ def check_pidfile(proc, pidfile):
209207
def test_scheduler_port_zero(loop):
210208
with tmpfile() as fn:
211209
with popen(
212-
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"],
213-
flush_output=False,
214-
) as proc:
215-
line = wait_for_log_line(b"dashboard at", proc.stdout)
210+
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"]
211+
):
216212
with Client(scheduler_file=fn, loop=loop) as c:
217213
assert c.scheduler.port
218214
assert c.scheduler.port != 8786
@@ -222,7 +218,7 @@ def test_dashboard_port_zero(loop):
222218
pytest.importorskip("bokeh")
223219
with popen(
224220
["dask-scheduler", "--dashboard-address", ":0"],
225-
flush_output=False,
221+
capture_output=True,
226222
) as proc:
227223
line = wait_for_log_line(b"dashboard at", proc.stdout)
228224
dashboard_port = int(line.decode().split(":")[-1].strip())

distributed/cli/tests/test_dask_spec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ def test_errors():
7878
"--spec-file",
7979
"foo.yaml",
8080
],
81-
flush_output=False,
81+
capture_output=True,
8282
) as proc:
8383
line = proc.stdout.readline().decode()
8484
assert "exactly one" in line
8585
assert "--spec" in line and "--spec-file" in line
8686

8787
with popen(
8888
[sys.executable, "-m", "distributed.cli.dask_spec"],
89-
flush_output=False,
89+
capture_output=True,
9090
) as proc:
9191
line = proc.stdout.readline().decode()
9292
assert "exactly one" in line

distributed/cli/tests/test_dask_ssh.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def test_version_option():
2323
def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
2424
with popen(
2525
["dask-ssh", "--nprocs=2", "--nohost", "localhost"],
26-
flush_output=False,
26+
capture_output=True,
2727
) as proc:
2828
with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c:
2929
c.wait_for_workers(2, timeout="15 seconds")
@@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
3636
def test_ssh_cli_nworkers_with_nprocs_is_an_error():
3737
with popen(
3838
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
39-
flush_output=False,
39+
capture_output=True,
4040
) as proc:
4141
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)

distributed/cli/tests/test_dask_worker.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
244244
"9686:9687",
245245
"--no-dashboard",
246246
],
247-
flush_output=False,
247+
capture_output=True,
248248
) as worker:
249249
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)
250250

@@ -278,14 +278,14 @@ async def test_no_nanny(c, s):
278278
async def test_reconnect_deprecated(c, s):
279279
with popen(
280280
["dask-worker", s.address, "--reconnect"],
281-
flush_output=False,
281+
capture_output=True,
282282
) as worker:
283283
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
284284
assert worker.wait() == 1
285285

286286
with popen(
287287
["dask-worker", s.address, "--no-reconnect"],
288-
flush_output=False,
288+
capture_output=True,
289289
) as worker:
290290
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
291291
await c.wait_for_workers(1)
@@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch):
361361
async def test_nworkers_requires_nanny(s):
362362
with popen(
363363
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
364-
flush_output=False,
364+
capture_output=True,
365365
) as worker:
366366
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)
367367

@@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s):
400400
async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
401401
with popen(
402402
["dask-worker", s.address, "--nprocs=2"],
403-
flush_output=False,
403+
capture_output=True,
404404
) as worker:
405405
await c.wait_for_workers(2)
406406
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)
@@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
410410
async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
411411
with popen(
412412
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
413-
flush_output=False,
413+
capture_output=True,
414414
) as worker:
415415
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)
416416

@@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny):
708708
"--port",
709709
scheduler_port,
710710
],
711-
flush_output=False,
711+
capture_output=True,
712712
) as scheduler:
713713
start = time()
714714
# Wait for the scheduler to be up

distributed/deploy/tests/test_local.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import subprocess
33
import sys
4-
import unittest
54
from threading import Lock
65
from time import sleep
76
from urllib.parse import urlparse
@@ -15,7 +14,6 @@
1514
from distributed import Client, LocalCluster, Nanny, Worker, get_client
1615
from distributed.compatibility import LINUX
1716
from distributed.core import Status
18-
from distributed.deploy.utils_test import ClusterTest
1917
from distributed.metrics import time
2018
from distributed.system import MEMORY_LIMIT
2119
from distributed.utils import TimeoutError, sync
@@ -170,9 +168,49 @@ def test_transports_tcp_port():
170168
assert e.submit(inc, 4).result() == 5
171169

172170

173-
class LocalTest(ClusterTest, unittest.TestCase):
174-
Cluster = LocalCluster # type: ignore
175-
kwargs = {"silence_logs": False, "dashboard_address": ":0", "processes": False}
171+
def test_cores(loop):
172+
with LocalCluster(
173+
n_workers=2,
174+
scheduler_port=0,
175+
silence_logs=False,
176+
dashboard_address=":0",
177+
processes=False,
178+
loop=loop,
179+
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
180+
client.scheduler_info()
181+
assert len(client.nthreads()) == 2
182+
183+
184+
def test_submit(loop):
185+
with LocalCluster(
186+
n_workers=2,
187+
scheduler_port=0,
188+
silence_logs=False,
189+
dashboard_address=":0",
190+
processes=False,
191+
loop=loop,
192+
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
193+
future = client.submit(lambda x: x + 1, 1)
194+
assert future.result() == 2
195+
196+
197+
def test_context_manager(loop):
198+
with LocalCluster(
199+
silence_logs=False, dashboard_address=":0", processes=False, loop=loop
200+
) as c, Client(c) as e:
201+
assert e.nthreads()
202+
203+
204+
def test_no_workers_sync(loop):
205+
with LocalCluster(
206+
n_workers=0,
207+
scheduler_port=0,
208+
silence_logs=False,
209+
dashboard_address=":0",
210+
processes=False,
211+
loop=loop,
212+
):
213+
pass
176214

177215

178216
def test_Client_with_local(loop):

distributed/deploy/utils_test.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

distributed/nanny.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -809,12 +809,10 @@ def _run(
809809
try:
810810
os.environ.update(env)
811811
dask.config.set(config)
812-
try:
813-
from dask.multiprocessing import initialize_worker_process
814-
except ImportError: # old Dask version
815-
pass
816-
else:
817-
initialize_worker_process()
812+
813+
from dask.multiprocessing import default_initializer
814+
815+
default_initializer()
818816

819817
if silence_logs:
820818
logger.setLevel(silence_logs)

distributed/preloading.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ def __init__(
183183
self.argv = list(argv)
184184
self.file_dir = file_dir
185185

186+
logger.info("Creating preload: %s", self.name)
187+
186188
if is_webaddress(name):
187189
self.module = _download_module(name)
188190
else:
@@ -193,6 +195,7 @@ async def start(self):
193195
dask_setup = getattr(self.module, "dask_setup", None)
194196

195197
if dask_setup:
198+
logger.info("Run preload setup: %s", self.name)
196199
if isinstance(dask_setup, click.Command):
197200
context = dask_setup.make_context(
198201
"dask_setup", self.argv, allow_extra_args=False
@@ -202,17 +205,16 @@ async def start(self):
202205
)
203206
if inspect.isawaitable(result):
204207
await result
205-
logger.info("Run preload setup click command: %s", self.name)
206208
else:
207209
future = dask_setup(self.dask_object)
208210
if inspect.isawaitable(future):
209211
await future
210-
logger.info("Run preload setup function: %s", self.name)
211212

212213
async def teardown(self):
213214
"""Run when the server starts its close method"""
214215
dask_teardown = getattr(self.module, "dask_teardown", None)
215216
if dask_teardown:
217+
logger.info("Run preload teardown: %s", self.name)
216218
future = dask_teardown(self.dask_object)
217219
if inspect.isawaitable(future):
218220
await future

0 commit comments

Comments
 (0)