Advanced asyncio
Solving Real-World Production Problems
Lynn Root | SRE | @roguelynn
$ whoami
agenda
• Initial setup of Mayhem Mandrill
• Development best practices
• Testing, debugging, and profiling
slides: [Link]/adv-aio
async all the things
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
building mayhem mandrill
initial setup
initial setup
concurrently publish messages
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)
asyncio.create_task([Link](msg))
[Link](f"Published {msg}")
# simulate randomness of publishing messages
await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)
asyncio.create_task([Link](msg))
[Link](f"Published {msg}")
# simulate randomness of publishing messages
await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)
await [Link](msg)
[Link](f"Published {msg}")
# simulate randomness of publishing messages
await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)
await [Link](msg) # lines below are blocked
[Link](f"Published {msg}")
# simulate randomness of publishing messages
await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)
asyncio.create_task([Link](msg))
[Link](f"Published {msg}")
# simulate randomness of publishing messages
await [Link]([Link]())
initial setup
concurrently consume messages
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]() # <-- does not block loop
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]() # <-- only blocks coro scope
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
await restart_host(msg)
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
asyncio.create_task(restart_host(msg))
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
asyncio.create_task(restart_host(msg))
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
initial setup
concurrent work
concurrent work
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrent work
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
concurrent work
async def restart_host(msg):
...
async def save(msg):
...
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))
concurrent work
async def restart_host(msg):
...
async def save(msg):
...
async def consume(queue):
while True:
msg = await [Link]()
[Link](f'Pulled {msg}')
await save(msg)
await restart_host(msg)
block when needed
async def restart_host(msg):
...
async def save(msg):
...
async def consume(queue):
while True:
msg = await [Link]()
[Link](f'Pulled {msg}')
await save(msg)
last_restart = await last_restart_date(msg)
if today - last_restart > max_days:
await restart_host(msg)
block when needed
async def handle_message(msg):
await save(msg)
last_restart = await last_restart_date(msg)
if today - last_restart > max_days:
asyncio.create_task(restart_host(msg))
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
block when needed
async def handle_message(msg):
asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
initial setup
finalization tasks
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")
async def handle_message(msg):
asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")
async def handle_message(msg):
await save(msg)
await restart_host(msg)
cleanup(msg)
unblocking: finalization tasks
def cleanup(msg, fut):
[Link] = True
[Link](f"Done. Acked {msg}")
async def handle_message(msg):
g_future = [Link](save(msg), restart_host(msg))
callback = [Link](cleanup, msg)
g_future.add_done_callback(callback)
await g_future
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
def cleanup(msg, fut):
[Link] = True
[Link](f"Done. Acked {msg}")
async def handle_message(msg):
g_future = [Link](save(msg), restart_host(msg))
callback = [Link](cleanup, msg)
g_future.add_done_callback(callback)
await g_future
unblocking: finalization tasks
async def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")
async def handle_message(msg):
await [Link](save(msg), restart_host(msg))
await cleanup(msg)
adding concurrency: tl;dr
• Asynchronous != concurrent
• Serial != blocking
graceful shutdowns
graceful shutdowns
responding to signals
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt: # <-- a.k.a. SIGINT
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
$ python [Link]
$ pkill -INT -f "python [Link]"
[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')
[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],144 INFO: Process interrupted
[Link],144 INFO: Cleaning up
responding to signals
$ python [Link]
$ pkill -INT -f "python [Link]"
[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')
[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],144 INFO: Process interrupted
[Link],144 INFO: Cleaning up
responding to signals
$ python [Link]
$ pkill -TERM -f "python [Link]"
[Link],553 INFO: Pulled Message(inst_name='cattle-npww')
[Link],554 INFO: Done. Acked Message(inst_name='cattle-npww')
[Link],655 INFO: Pulled Message(inst_name='cattle-rm7n')
[Link],655 INFO: Done. Acked Message(inst_name='cattle-rm7n')
[Link],790 INFO: Saved Message(inst_name='cattle-rm7n') into database
[Link],831 INFO: Saved Message(inst_name='cattle-npww') into database
[1] 78851 terminated python mandrill/[Link]
responding to signals
$ python [Link]
$ pkill -TERM -f "python [Link]"
[Link],553 INFO: Pulled Message(inst_name='cattle-npww')
[Link],554 INFO: Done. Acked Message(inst_name='cattle-npww')
[Link],655 INFO: Pulled Message(inst_name='cattle-rm7n')
[Link],655 INFO: Done. Acked Message(inst_name='cattle-rm7n')
[Link],790 INFO: Saved Message(inst_name='cattle-rm7n') into database
[Link],831 INFO: Saved Message(inst_name='cattle-npww') into database
[1] 78851 terminated python mandrill/[Link]
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop() # <-- could happen here or earlier
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted") # <-- could happen here
finally:
[Link]("Cleaning up") # <-- could happen here
[Link]() # <-- could happen here
graceful shutdowns
signal handler
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]
[Link](f"Cancelling {len(tasks)} outstanding tasks")
await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics”)
[Link]()
[Link]("Shutdown complete.")
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]
[Link](f"Cancelling {len(tasks)} outstanding tasks")
await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics”)
[Link]()
[Link]("Shutdown complete.")
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]
[Link](f"Cancelling {len(tasks)} outstanding tasks")
await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics")
[Link]()
[Link]("Shutdown complete.")
signal handler
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
signal handler
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
queue = [Link]()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
signal handler
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
queue = [Link]()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
signal handler
$ python [Link]
# or -HUP or -INT
$ pkill -TERM -f "python [Link]"
[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')
[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],143 INFO: Received exit signal SIGTERM...
[Link],143 INFO: Closing database connections
[Link],144 INFO: Cancelling 19 outstanding tasks
[Link],144 INFO: Flushing metrics
[Link],145 INFO: Cleaning up
signal handler
$ python [Link]
# or -HUP or -INT
$ pkill -TERM -f "python [Link]"
[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')
[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],143 INFO: Received exit signal SIGTERM...
[Link],143 INFO: Closing database connections
[Link],144 INFO: Cancelling 19 outstanding tasks
[Link],144 INFO: Flushing metrics
[Link],145 INFO: Cleaning up
graceful shutdowns
which signals to care about
which signals to care about
Hard Exit Graceful Reload/Restart
nginx TERM, INT QUIT HUP
Apache TERM WINCH HUP
uWSGI INT, QUIT HUP, TERM
Gunicorn INT, QUIT TERM HUP
Docker KILL TERM
graceful shutdowns
not-so-graceful [Link]
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")
ungraceful shutdown: [Link]
async def cant_stop_me():
...
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
shielded_coro = [Link](cant_stop_me())
try:
loop.run_until_complete(shielded_coro)
finally:
[Link]("Cleaning up")
[Link]()
ungraceful shutdown: [Link]
async def cant_stop_me():
...
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
shielded_coro = [Link](cant_stop_me())
try:
loop.run_until_complete(shielded_coro)
finally:
[Link]("Cleaning up")
[Link]()
ungraceful shutdown: [Link]
[Link],105 INFO: Hold on...
^[Link],156 INFO: Received exit signal SIGINT...
[Link],156 INFO: Cancelling 2 outstanding tasks
[Link],156 INFO: Coroutine cancelled
[Link],157 INFO: Cleaning up
Traceback (most recent call last):
File "examples/shield_test.py", line 62, in
loop.run_until_complete(shielded_coro)
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.p
return [Link]()
[Link]._base.CancelledError
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")
async def main():
await [Link](cant_stop_me())
[Link](main())
ungraceful shutdown: [Link]
[Link],587 INFO: Hold on...
^[Link],982 INFO: Cleaning up
Traceback (most recent call last):
File "shield_test_no_shutdown.py", line 23, in <module>
loop.run_until_complete(shielded_coro)
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
self.run_forever()
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
self._run_once()
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
event_list = self._selector.select(timeout)
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/[Link]", line 5
kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")
async def imma_let_you_speak(task_to_cancel):
await [Link](2)
[Link](f"interrupting {task_to_cancel}”)
task_to_cancel.cancel()
async def main():
shielded = [Link](cant_stop_me())
cancel_coro = imma_let_you_speak(shielded)
await [Link](shielded, cancel_coro)
[Link](main())
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")
async def imma_let_you_speak(task_to_cancel):
await [Link](2)
[Link](f"interrupting {task_to_cancel}”)
task_to_cancel.cancel()
async def main():
shielded = [Link](cant_stop_me())
cancel_coro = imma_let_you_speak(shielded)
await [Link](shielded, cancel_coro)
[Link](main())
ungraceful shutdown: [Link]
[Link],729 INFO: Hold on...
[Link],730 INFO: killing <Future pending cb=[gather.<locals>._done_callback
python3.7/asyncio/[Link]]>
Traceback (most recent call last):
File "shield_test_no_shutdown.py", line 38, in <module>
[Link](main())
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
return loop.run_until_complete(main)
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.p
return [Link]()
[Link]._base.CancelledError
graceful shutdown: tl;dr
• try/except/finally isn’t enough
• Define desired shutdown behavior
• Use signal handlers
• Listen for appropriate signals
exception handling
exception handling
global handler
global handler
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
global handler
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
rand_int = [Link](1, 5)
if rand_int == 2:
raise Exception(f"Could not restart {[Link]}!")
[Link] = True
[Link](f"Restarted {[Link]}")
global handler
$ python [Link]
global handler
$ python [Link]
[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
$ python [Link]
[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
$ python [Link]
[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")
def main():
loop = asyncio.get_event_loop()
# <-- snip —>
loop.set_exception_handler(exception_handler)
# <-- snip -->
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")
def main():
loop = asyncio.get_event_loop()
# <-- snip —>
loop.set_exception_handler(exception_handler)
# <-- snip -->
global handler
$ python [Link]
global handler
$ python [Link]
[Link],187 INFO: Pulled Message(instance_name='cattle-i490')
[Link],192 INFO: Restarted [Link]
[Link],241 INFO: Pulled Message(instance_name='cattle-31is')
[Link],331 INFO: Saved Message(instance_name='cattle-31is') into database
[Link],535 INFO: Pulled Message(instance_name='cattle-sx7f')
[Link],535 ERROR: Caught exception: Could not restart [Link].n
[Link],730 INFO: Pulled Message(instance_name='cattle-hsh9')
[Link],731 INFO: Saved Message(instance_name='cattle-sx7f') into database
[Link],759 INFO: Pulled Message(instance_name='cattle-g20p')
[Link],800 INFO: Restarted [Link]
[Link],26 INFO: Saved Message(instance_name='cattle-i490') into database
[Link],45 INFO: Saved Message(instance_name='cattle-hsh9') into database
[Link],181 INFO: Saved Message(instance_name='cattle-g20p') into database
[Link],194 INFO: Restarted [Link]
global handler
$ python [Link]
[Link],187 INFO: Pulled Message(instance_name='cattle-i490')
[Link],192 INFO: Restarted [Link]
[Link],241 INFO: Pulled Message(instance_name='cattle-31is')
[Link],331 INFO: Saved Message(instance_name='cattle-31is') into database
[Link],535 INFO: Pulled Message(instance_name='cattle-sx7f')
[Link],535 ERROR: Caught exception: Could not restart cattle-sx7f
[Link],730 INFO: Pulled Message(instance_name='cattle-hsh9')
[Link],731 INFO: Saved Message(instance_name='cattle-sx7f') into database
[Link],759 INFO: Pulled Message(instance_name='cattle-g20p')
[Link],800 INFO: Restarted [Link]
[Link],26 INFO: Saved Message(instance_name='cattle-i490') into database
[Link],45 INFO: Saved Message(instance_name='cattle-hsh9') into database
[Link],181 INFO: Saved Message(instance_name='cattle-g20p') into database
[Link],194 INFO: Restarted [Link]
exception handling
specific handlers
specific handlers
async def handle_message(msg):
await [Link](save(msg), restart_host(msg))
await cleanup(msg)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)
to_ack = True
if isinstance(restarted, Exception):
to_ack = False
await cleanup(msg, to_ack)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)
to_ack = True
if isinstance(restarted, Exception):
to_ack = False
await cleanup(msg, to_ack)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)
to_ack = True
if isinstance(restarted, Exception):
to_ack = False
await cleanup(msg, to_ack)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)
to_ack = True
if isinstance(restarted, Exception):
to_ack = False
await cleanup(msg, to_ack)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)
to_ack = True
if isinstance(restarted, Exception):
to_ack = False
await cleanup(msg, to_ack)
exception handling
• global exception handling: loop.set_exception_handler
• individual exception handling: [Link] with
return_exceptions=True
threads and asyncio
threads and asyncio
running coroutines from other threads
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
[Link],833 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],833 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 115, in callback
asyncio.create_task(handle_message(data))
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
line 320, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
running coroutines from other threads
[Link],833 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],833 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 115, in callback
asyncio.create_task(handle_message(data))
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
line 320, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)
def handle_message_sync(loop, msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)
def handle_message_sync(loop, msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
[Link],543 INFO: Pulled Message(inst_name='xbci')
[Link],543 INFO: Pulled Message(inst_name='e8x5')
[Link],544 INFO: Running something else
[Link],721 INFO: Saved Message(inst_name='e8x5') into database
[Link],828 INFO: Saved Message(inst_name='xbci') into database
[Link],828 ERROR: Caught exception: Could not restart [Link]
[Link],549 INFO: Restarted [Link]
[Link],821 INFO: Done. Message(inst_name='e8x5')
[Link],108 INFO: Running something else
[Link],276 INFO: Done. Message(inst_name='xbci')
threads and asyncio
running coroutines from other threads
ke 2
ta
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)
def handle_message_sync(loop, msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)
def handle_message_sync(loop, msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.run_coroutine_threadsafe(handle_message(data), loop)
async def run():
loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
threads and asyncio
• ThreadPoolExecutor: calling threaded code from the
main event loop
• asyncio.run_coroutine_threadsafe: running a
coroutine on the main event loop from another thread
testing asyncio code
testing asyncio code
simple testing with pytest
simple testing with pytest
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
def test_save(message):
assert not [Link] # sanity check
[Link]([Link](message))
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
def test_save(message):
assert not [Link] # sanity check
[Link]([Link](message))
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
def test_save(message):
assert not [Link] # sanity check
loop = asyncio.get_event_loop()
loop.run_until_complete([Link](message))
[Link]()
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
def test_save(message):
assert not [Link] # sanity check
loop = asyncio.get_event_loop()
loop.run_until_complete([Link](message))
[Link]()
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")
@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
testing asyncio code
mocking coroutines
mocking coroutines
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
mocking coroutines
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()
async def _coro(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch: # <-- may not need/want to patch anything
[Link](to_patch, _coro)
return mock, _coro
return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()
async def _coro(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch: # <-- may not need/want to patch anything
[Link](to_patch, _coro)
return mock, _coro
return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()
async def _coro(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch: # <-- may not need/want to patch anything
[Link](to_patch, _coro)
return mock, _coro
return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()
async def _coro(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch: # <-- may not need/want to patch anything
[Link](to_patch, _coro)
return mock, _coro
return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()
async def _coro(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch: # <-- may not need/want to patch anything
[Link](to_patch, _coro)
return mock, _coro
return _create_mock_patch_coro
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock
@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock
@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock
@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
testing asyncio code
testing create_task
testing create_task
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value
@[Link]
def mock_get(mock_queue, create_coro_mock):
mock, coro = create_coro_mock()
mock_queue.get = coro
return mock
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value
@[Link]
def mock_get(mock_queue, create_coro_mock):
mock, coro = create_coro_mock()
mock_queue.get = coro
return mock
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
mock_handle_message.assert_called_once_with(message)
testing create_task
============================= FAILURES =============================
___________________________ test_consume ___________________________
<--snip-->
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_get.side_effect = [message, Exception("break while loop")]
mock_handle_message = create_coro_mock("mayhem.handle_message")
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
> mock_handle_message.assert_called_once_with(message)
E AssertionError: Expected 'mock' to be called once. Called 0 times.
test_mayhem.py:230: AssertionError
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check
await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock(“mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check
await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock(“mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]
with [Link](Exception, match="break while loop"):
await [Link](mock_queue)
ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check
await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing asyncio code
testing the event loop
testing the event loop
def main():
loop = asyncio.get_event_loop()
for s in ([Link], [Link], [Link]):
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop))
)
loop.set_exception_handler(exception_handler)
queue = [Link]()
try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()
yield new_loop
new_loop._close()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()
yield new_loop
new_loop._close()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()
yield new_loop
new_loop._close()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()
assert [Link] in event_loop._signal_handlers
assert mayhem.handle_exception == event_loop.get_exception_handler()
mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()
# asserting the loop is stopped but not closed
assert not event_loop.is_running()
assert not event_loop.is_closed()
event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()
assert [Link] in event_loop._signal_handlers
assert mayhem.handle_exception == event_loop.get_exception_handler()
mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()
# asserting the loop is stopped but not closed
assert not event_loop.is_running()
assert not event_loop.is_closed()
event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()
assert [Link] in event_loop._signal_handlers
assert mayhem.handle_exception == event_loop.get_exception_handler()
mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()
# asserting the loop is stopped but not closed
assert not event_loop.is_running()
assert not event_loop.is_closed()
event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
# <--snip-->
testing the event loop
@[Link]("sig_to_test", ("SIGINT", "SIGTERM", "SIGHUP"))
def test_main(sig_to_test, create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), sig_to_test)
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
assert sig_to_test in event_loop._signal_handlers
# <--snip-->
testing the event loop
@[Link]("sig_to_test", ("SIGINT", "SIGTERM", "SIGHUP"))
def test_main(sig_to_test, create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")
def _send_signal():
[Link](0.1)
[Link]([Link](), sig_to_test)
thread = [Link](target=_send_signal, daemon=True)
[Link]()
[Link]()
assert sig_to_test in event_loop._signal_handlers
# <--snip-->
testing asyncio code
• pytest-asyncio + mocked coroutines
• asynctest for the former Java developers stdlib’s
unittest
debugging asyncio code
debugging asyncio code
manual debugging
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack() for t in tasks]
await [Link](2)
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack() for t in tasks]
await [Link](2)
manual debugging
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<cleanup() running at [Link]> wait_for=<Fu
File "[Link]", line 78, in cleanup
await [Link]([Link]())
Stack for <Task pending coro=<consume() running at [Link]> wait_for=<F
File "[Link]", line 115, in consume
msg = await [Link]()
Stack for <Task pending coro=<restart_host() running at [Link]> wait_fo
File "[Link]", line 62, in restart_host
await [Link]([Link](1, 3))
manual debugging
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<cleanup() running at [Link]> wait_for=<Fu
File "[Link]", line 78, in cleanup
await [Link]([Link]())
Stack for <Task pending coro=<consume() running at [Link]> wait_for=<F
File "[Link]", line 115, in consume
msg = await [Link]()
Stack for <Task pending coro=<restart_host() running at [Link]> wait_fo
File "[Link]", line 62, in restart_host
await [Link]([Link](1, 3))
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack(limit=5) for t in tasks]
await [Link](2)
debugging asyncio code
using debug mode
using debug mode: traceback context
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished coro=<handle_message() done, defined at
[Link]> exception=Exception('Could not restart cattle-
[Link]') created at /Users/lynn/.pyenv/versions/3.7.2/lib/
python3.7/asyncio/[Link]>
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished ...>
source_traceback: Object created at (most recent call last):
File "[Link]", line 164, in <module>
main()
File "[Link]", line 157, in main
loop.run_forever()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.p
self._run_once()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.p
handle._run()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/[Link]", l
self._context.run(self._callback, *self._args)
File "[Link]", line 117, in consume
asyncio.create_task(handle_message(msg))
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/[Link]", li
return loop.create_task(coro)
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished ...>
source_traceback: Object created at ...
Traceback (most recent call last):
File "[Link]", line 107, in handle_message
save_coro, restart_coro
File "[Link]", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]
[Link],954 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/pycon19/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 174, in callback
loop.create_task(handle_message(pubsub_msg))
# <-- snip -->
RuntimeError: Non-thread-safe operation invoked on an event loop other
than the current one
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]
[Link],954 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/pycon19/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 174, in callback
loop.create_task(handle_message(pubsub_msg))
# <-- snip -->
RuntimeError: Non-thread-safe operation invoked on an event loop other
than the current one
using debug mode: slow coros
async def save(msg):
[Link](1 + [Link]())
[Link] = True
[Link](f"Saved {msg} into database")
using debug mode: slow coros
async def save(msg):
[Link](1 + [Link]())
[Link] = True
[Link](f"Saved {msg} into database")
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]
[Link],781 INFO: Pulled Message(inst_name='cattle-okxa')
[Link],782 INFO: Extended deadline 3s for Message(inst_name='cattle-okxa')
[Link],416 INFO: Saved Message(inst_name='cattle-okxa') into database
[Link],417 WARNING: Executing <Task finished coro=<save() done, defined at
[Link]> result=None
created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/
[Link]> took 1.634 seconds
[Link],418 INFO: Pulled Message(instance_name='cattle-pmbv')
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]
[Link],781 INFO: Pulled Message(inst_name='cattle-okxa')
[Link],782 INFO: Extended deadline 3s for Message(inst_name='cattle-okxa')
[Link],416 INFO: Saved Message(inst_name='cattle-okxa') into database
[Link],417 WARNING: Executing <Task finished coro=<save() done, defined at
[Link]> result=None
created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/
[Link]> took 1.634 seconds
[Link],418 INFO: Pulled Message(instance_name='cattle-pmbv')
using debug mode: slow coros
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
# float, in seconds
loop.slow_callback_duration = 0.5
# <-- snip -->
using debug mode: slow coros
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
# float, in seconds
loop.slow_callback_duration = 0.5
# <-- snip -->
debugging asyncio code
debugging in production
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
from aiodebug import monitor_loop_lag
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
monitor_loop_lag.enable(my_statsd_client)
debugging asyncio code
• manual: task.print_stack()
• proper: built-in debug mode
• f-it, we’ll do it live: aiodebug
profiling asyncio code
profiling asyncio code
cProfile
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
ncalls tottime percall ... filename:lineno(function)
134 4.785 0.036 ... {method 'control' of '[Link]' objects}
17 0.007 0.000 ... {built-in method _imp.create_dynamic}
62 0.007 0.000 ... {built-in method [Link]}
132 0.003 0.000 ... base_events.py:1679(_run_once)
217/216 0.003 0.000 ... {built-in method builtins.__build_class__}
361 0.003 0.000 ... {built-in method [Link]}
62 0.002 0.000 ... <frozen importlib._bootstrap_external>:914(ge
42 0.002 0.000 ... {built-in method [Link]}
195 0.001 0.000 ... <frozen importlib._bootstrap_external>:1356(f
50 0.001 0.000 ... {method 'write' of '_io.TextIOWrapper' object
122 0.001 0.000 ... _make.py:1217(__repr__)
50 0.001 0.000 ... __init__.py:293(__init__)
18 0.001 0.000 ... [Link](__new__)
72/15 0.001 0.000 ... sre_parse.py:475(_parse)
62 0.001 0.000 ... {method 'read' of '_io.FileIO' objects}
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
ncalls tottime percall ... filename:lineno(function)
134 4.785 0.036 ... {method 'control' of '[Link]' objects}
17 0.007 0.000 ... {built-in method _imp.create_dynamic}
62 0.007 0.000 ... {built-in method [Link]}
132 0.003 0.000 ... base_events.py:1679(_run_once)
217/216 0.003 0.000 ... {built-in method builtins.__build_class__}
361 0.003 0.000 ... {built-in method [Link]}
62 0.002 0.000 ... <frozen importlib._bootstrap_external>:914(ge
42 0.002 0.000 ... {built-in method [Link]}
195 0.001 0.000 ... <frozen importlib._bootstrap_external>:1356(f
50 0.001 0.000 ... {method 'write' of '_io.TextIOWrapper' object
122 0.001 0.000 ... _make.py:1217(__repr__)
50 0.001 0.000 ... __init__.py:293(__init__)
18 0.001 0.000 ... [Link](__new__)
72/15 0.001 0.000 ... sre_parse.py:475(_parse)
62 0.001 0.000 ... {method 'read' of '_io.FileIO' objects}
cProfile
$ timeout -s INT 5s python -m cProfile -s filename [Link]
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 4.704 4.704 [Link](main)
18 0.000 0.000 0.002 0.000 [Link](restart_host)
22 0.000 0.000 0.003 0.000 [Link](save)
33 0.000 0.000 0.003 0.000 [Link](cleanup)
22 0.000 0.000 0.002 0.000 [Link](extend)
11 0.000 0.000 0.000 0.000 [Link](handle_results)
12 0.000 0.000 0.002 0.000 [Link](publish)
22 0.000 0.000 0.002 0.000 [Link](handle_message)
12 0.000 0.000 0.003 0.000 [Link](consume)
11 0.000 0.000 0.000 0.000 [Link](__attrs_post_init_
1 0.000 0.000 0.000 0.000 [Link](Message)
1 0.000 0.000 0.000 0.000 [Link](<listcomp>)
2 0.000 0.000 0.001 0.000 [Link](shutdown)
1 0.000 0.000 0.000 0.000 [Link](<lambda>)
cProfile
$ timeout -s INT 5s python -m cProfile -s filename [Link]
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 4.704 4.704 [Link](main)
18 0.000 0.000 0.002 0.000 [Link](restart_host)
22 0.000 0.000 0.003 0.000 [Link](save)
33 0.000 0.000 0.003 0.000 [Link](cleanup)
22 0.000 0.000 0.002 0.000 [Link](extend)
11 0.000 0.000 0.000 0.000 [Link](handle_results)
12 0.000 0.000 0.002 0.000 [Link](publish)
22 0.000 0.000 0.002 0.000 [Link](handle_message)
12 0.000 0.000 0.003 0.000 [Link](consume)
11 0.000 0.000 0.000 0.000 [Link](__attrs_post_init_
1 0.000 0.000 0.000 0.000 [Link](Message)
1 0.000 0.000 0.000 0.000 [Link](<listcomp>)
2 0.000 0.000 0.001 0.000 [Link](shutdown)
1 0.000 0.000 0.000 0.000 [Link](<lambda>)
profiling asyncio code
cProfile with KCacheGrind
cProfile with (K|Q)CacheGrind
$ timeout -s INT 5s python -m cProfile -o [Link] [Link]
cProfile with (K|Q)CacheGrind
$ timeout -s INT 5s python -m cProfile -o [Link] [Link]
$ pyprof2calltree --kcachegrind -i [Link]
profiling asyncio code
line_profiler
line_profiler
@profile
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
line_profiler
@profile
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.002202 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.002202 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.002202 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
import aiologger
logger = [Link].with_default_handlers()
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.0011 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.0011 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
Timer unit: 1e-06 s
Total time: 0.0011 s
File: [Link]
Function: save at line 69
Line # Hits Time Per Hit % Time Line Contents
=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
profiling asyncio code
live profiling
live profiling
$ profiling live-profile --mono [Link]
profiling asyncio code
• No idea? Visualize with cProfile with
pyprof2calltree + KCacheGrind
• Some idea? line_profiler
• f-it, we’ll do it live: profiling live-profiler
Thank you!
[Link]/adv-aio
Lynn Root | SRE | @roguelynn