Skip to content

Commit 92bfb44

Browse files
committed
Refactor run_async loop to wait on queued events and futures at the same time
Fixes ros2/launch_ros#169 Otherwise, it's possible to get into a hung state where we wait for an event, even though there are no more events. This is because the check for an "idle" state evaluates to "True" as we wait for some futures to complete. By waiting for futures and events concurrently, we can avoid this problem. Further, we don't have to wait for an event if there's nothing in the queue. Signed-off-by: Jacob Perron <[email protected]>
1 parent b847fe4 commit 92bfb44

1 file changed

Lines changed: 21 additions & 21 deletions

File tree

launch/launch/launch_service.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -338,28 +338,28 @@ def _on_exception(loop, context):
338338
ret = await self._shutdown(reason='idle', due_to_sigint=False)
339339
assert ret is None, ret
340340
continue
341-
process_one_event_task = this_loop.create_task(self._process_one_event())
342-
if self.__shutting_down:
343-
# If shutting down and idle then we're done.
344-
if is_idle:
345-
process_one_event_task.cancel()
346-
break
347-
entity_futures = [pair[1] for pair in self._entity_future_pairs]
341+
342+
# Stop running if we're shutting down and there's no more work
343+
if self.__shutting_down and is_idle:
344+
break
345+
346+
# Collect futures to wait on
347+
entity_futures = [pair[1] for pair in self._entity_future_pairs]
348+
entity_futures.extend(self.__context._completion_futures)
349+
350+
# If there is an event in the queue, create a task to process it
351+
# and add it to the list of awaitables
352+
if not self.__context._event_queue.empty():
353+
process_one_event_task = this_loop.create_task(self._process_one_event())
348354
entity_futures.append(process_one_event_task)
349-
entity_futures.extend(self.__context._completion_futures)
350-
done = set() # type: Set[asyncio.Future]
351-
while not done:
352-
done, pending = await asyncio.wait(
353-
entity_futures,
354-
timeout=1.0,
355-
return_when=asyncio.FIRST_COMPLETED
356-
)
357-
if not done:
358-
self.__logger.debug(
359-
'still waiting on futures: {}'.format(entity_futures)
360-
)
361-
else:
362-
await process_one_event_task
355+
356+
if len(entity_futures) > 0:
357+
done, pending = await asyncio.wait(
358+
entity_futures,
359+
timeout=1.0,
360+
return_when=asyncio.FIRST_COMPLETED
361+
)
362+
363363
except KeyboardInterrupt:
364364
continue
365365
except asyncio.CancelledError:

0 commit comments

Comments
 (0)