-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
Before I start, sincere apologies for not having a reproducible example but the issue is a bit weird and hard to reproduce. I mostly querying here to see if anybody has an idea what might be going on.
What happened:
We have seen multiple times now that some individual computations are stuck in the processing state and are not being processed. A closer investigation revealed in all occurrences that the worker the processing task is on, let's call it WorkerA, is in the process of fetching the tasks dependencies from WorkerB. While fetching these, an exception seems to be triggered on WorkerB and for some reason this exception is lost. In particular, WorkerA never gets any signal that something was wrong and it waits indefinitely for the dependency to arrive while WorkerB already forgot about the error. This scenario ultimately leads to a deadlock where the single dependency fetch is blocking the cluster. It looks like the cluster never actually self heals. The last time I could observe this issue, the computation was blocked for about 18hrs. After manually closing all connections, the workers retried and the graph finished successfully.
I've seen this deadlock happen multiple times already but only during the last instance, I could glimps an error log connecting to issue #4130 where the tornado stream seems to cause this error. Irregardless of what caused the error, I believe there is an issue on distributed side since the error was not properly propagated, the connections where not closed, etc. suggesting to WorkerA that everything was fine.
Traceback
Traceback (most recent call last):
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/core.py", line 513, in handle_comm
result = await result
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 1284, in get_data
compressed = await comm.write(msg, serializers=serializers)
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/comm/tcp.py", line 243, in write
future = stream.write(frame)
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 553, in write
self._write_buffer.append(data)
File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 177, in append
b += data # type: ignore
BufferError: Existing exports of data: object cannot be re-sized
The above traceback shows that the error appeared while WorkerB was handling the request, executing the get_data handler, ultimately raising the exception
distributed/distributed/core.py
Lines 523 to 535 in 34fb932
| try: | |
| result = handler(comm, **msg) | |
| if inspect.isawaitable(result): | |
| result = asyncio.ensure_future(result) | |
| self._ongoing_coroutines.add(result) | |
| result = await result | |
| except (CommClosedError, CancelledError) as e: | |
| if self.status == Status.running: | |
| logger.info("Lost connection to %r: %s", address, e) | |
| break | |
| except Exception as e: | |
| logger.exception(e) | |
| result = error_message(e, status="uncaught-error") |
In this code section we can see that the exception is caught and logged. Following the code, an exception error result is generated which should be sent back to WorkerA causing the dependency fetch to fail, causing a retry
distributed/distributed/core.py
Lines 544 to 554 in 34fb932
| if reply and not is_dont_reply: | |
| try: | |
| await comm.write(result, serializers=serializers) | |
| except (EnvironmentError, TypeError) as e: | |
| logger.debug( | |
| "Lost connection to %r while sending result for op %r: %s", | |
| address, | |
| op, | |
| e, | |
| ) | |
| break |
Even if this fails (debug logs not enabled), the connection should be closed/aborted eventually and removed from the servers tracking
distributed/distributed/core.py
Lines 561 to 569 in 34fb932
| finally: | |
| del self._comms[comm] | |
| if not shutting_down() and not comm.closed(): | |
| try: | |
| comm.abort() | |
| except Exception as e: | |
| logger.error( | |
| "Failed while closing connection to %r: %s", address, e | |
| ) |
which never happens. I checked on the still running workers and the self._comm attributed was still populated with the seemingly broken connection.
Why was this finally block never executed? Why was the reply never submitted received by WorkerA?
Just looking at the code, I would assume this to be already implemented robustly but it seems I'm missing something. Anybody has a clue about what might be going on?
Environment
- Distributed version: 2.20.0
- Python version: Py 3.6
- Operating System: Debian
- Tornado 6.0.4
- Install method (conda, pip, source): pip