-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
I've experienced a deadlock around a worker's connection to the scheduler breaking and trying to restart. I'm opening this general issue to track all the problems and explain the situation; I'll open sub-issues for specific things to fix.
I believe the flow is something like this:
- An unhandled error in a stream handler on the worker occurs (
handle_compute_taskin this case). Server.handle_streamterminates, and closes the comm as its final act.- The closing of the comm causes the
BatchedSendto abort itself, settingself.please_stop=True - The error from
handle_streamcauseshandle_schedulerto try to restart the connection to the scheduler - The connection appears to be restarted (because the BatchedSend has been given a new, working comm), but the
_background_sendcoroutine isn’t actually running (becauseplease_stopwas never set back to False)! - So every
batched_stream.sendappears to succeed, but the messages actually just sit in the buffer forever - The scheduler thinks nothing is happening
Full narrative of me trying to figure out what's going on
The scheduler thinks a couple workers are processing some tasks, but those workers aren’t actually running any tasks and have nothing in ready. Additionally, both workers have 40-60 messages in their BatchedSend buffer but are not sending them. Those messages include things like 'task-finished' and 'task-erred'. So that’s causing the deadlock: the info the scheduler needs to assign new tasks is stuck in the BatchedStream buffer.
Why? I did notice Connection to scheduler broken. Reconnecting... log messages on those workers. Reading through Worker.handle_scheduler where that message prints, the error recovery seems to be scheduling a new heartbeat to run soon. I think the intent is that this will call into _register_with_scheduler once again, which will restart the comm and call self.batched_stream.start again with the new comm, restarting it?
However I don’t think a BatchedSend is actually restartable. When the _background_send coroutine hits an exception, it calls self.abort(). This sets please_stop=True (and drops the buffer containing potentially critical messages!).
If you call BatchedSend.start again after this though, please_stop is still True. So the _background_send coroutine will terminate immediately without sending anything.
Any subsequent messages written to the BatchedSend will just sit there and never be sent. However, send won’t actually raise an error, because send only checks if the comm is closed, not if the BatchedSend itself is closed.
I’m a little confused by this because if I’m reading it right, how was reconnection ever supposed to work? The BatchedSend doesn’t seem at all restartable, yet we’re acting like it is.
As for why the connection broke in the first place? I see this in the logs:
Traceback (most recent call last):
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1237, in handle_scheduler
await self.handle_stream(
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 564, in handle_stream
handler(**merge(extra, msg))
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1937, in handle_compute_task
self.tasks[key].nbytes = value
KeyError: "('slices-40d6794b777d639e9440f8f518224cfd', 2, 1)"
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
The worker’s handle_compute_task is trying to set nbytes for a dependency that has no task state yet. This seems broken but I don’t know why it can happen yet.
But anyway, this exception propagates up to the core Server.handle_stream method. And interestingly, we don’t try-except each call to stream handlers. If a stream handler ever fails, we abort stream handling, close the comm, and raise an error.
And that error from handle_stream is what causes handle_scheduler to try to reconnect, which doesn’t actually work because the BatchedSend is stopped.
There are two problems here:
- General: Properly support restarting
BatchedSend#5481. I believe that any unhandled error in a stream handler would putWorker.batched_streaminto a broken state upon reconnection, where nosendsactually get sent. - Specific: KeyError in
Worker.handle_compute_task(causes deadlock) #5482
I also notice a couple other issues:
-
BatchedSendis dropping all buffered messages when it hits an error. Those messages still could (and must!) be sent upon reconnection. Dropping these messages will cause other deadlocks. Do not drop BatchedSend payload if worker reconnects #5457 addresses this. - Is it intended that any error from a handler makes
Server.handle_streamclose the comm? #5483 Should we not have try-excepts around eachhandler()call?
As it currently stands, I think #5457 will fix this deadlock. However, I'm concerned about the overall design of BatchedSend. It has neither an interface nor tests to support restarting after it's been closed. However, we're using it this way. If we want it to support reconnection, we should refactor and test it to do so. It also does not seem to have clear invariants about what its internal state means, and does not validate parts of its state when it probably should. It's a very old piece of code, still using Tornado. Overall, it might be due for a refresh.