0% found this document useful (0 votes)
170 views224 pages

Advanced Python Asyncio Guide

The document discusses using asyncio in Python to concurrently publish and consume messages from a queue to simulate real-world asynchronous production problems. It covers initial setup of publishing messages to a queue concurrently from multiple coroutines, consuming messages concurrently from the queue, and performing concurrent processing tasks on each message including saving to a database and restarting hosts. It also discusses strategies for blocking coroutines when needed and using finalization tasks to unblock the main loop after processing is complete.

Uploaded by

jeremy
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
170 views224 pages

Advanced Python Asyncio Guide

The document discusses using asyncio in Python to concurrently publish and consume messages from a queue to simulate real-world asynchronous production problems. It covers initial setup of publishing messages to a queue concurrently from multiple coroutines, consuming messages concurrently from the queue, and performing concurrent processing tasks on each message including saving to a database and restarting hosts. It also discusses strategies for blocking coroutines when needed and using finalization tasks to unblock the main loop after processing is complete.

Uploaded by

jeremy
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Advanced asyncio

Solving Real-World Production Problems

Lynn Root | SRE | @roguelynn


$ whoami
agenda

• Initial setup of Mayhem Mandrill


• Development best practices
• Testing, debugging, and profiling
slides: [Link]/adv-aio
async all the things
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():

FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():

FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
Python 3.7.0 (default, Jul 6 2018, [Link])
[Clang 9.1.0 (clang-[Link])] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():

FAKE NEWS
... print(f'[{[Link]()}] Hello...')
... await [Link](1) # some I/O-intensive work
... print(f'[{[Link]()}] ...World!')
...
>>> [Link](hello())
[2018-07-07 [Link].559856] Hello...
[2018-07-07 [Link].568737] ...World!
building mayhem mandrill
initial setup
initial setup
concurrently publish messages
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)

asyncio.create_task([Link](msg))
[Link](f"Published {msg}")

# simulate randomness of publishing messages


await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)

asyncio.create_task([Link](msg))
[Link](f"Published {msg}")

# simulate randomness of publishing messages


await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)

await [Link](msg)
[Link](f"Published {msg}")

# simulate randomness of publishing messages


await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)

await [Link](msg) # lines below are blocked


[Link](f"Published {msg}")

# simulate randomness of publishing messages


await [Link]([Link]())
concurrently publish messages
async def publish(queue):
choices = string.ascii_lowercase + [Link]
while True:
host_id = "".join([Link](choices, k=4))
msg = Message(
msg_id=str(uuid.uuid4()),
inst_name=f"cattle-{host_id}"
)

asyncio.create_task([Link](msg))
[Link](f"Published {msg}")

# simulate randomness of publishing messages


await [Link]([Link]())
initial setup
concurrently consume messages
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]() # <-- does not block loop
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]() # <-- only blocks coro scope
[Link](f"Consumed {msg}")
# unhelpful simulation of an i/o operation
await [Link]([Link]())
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
await restart_host(msg)

async def restart_host(msg):


# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
asyncio.create_task(restart_host(msg))

async def restart_host(msg):


# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrently consume messages
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Consumed {msg}")
asyncio.create_task(restart_host(msg))

async def restart_host(msg):


# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
initial setup
concurrent work
concurrent work
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
concurrent work
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")

async def save(msg):


# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
concurrent work
async def restart_host(msg):
...

async def save(msg):


...

async def consume(queue):


while True:
msg = await [Link]()
[Link](f"Pulled {msg}")

asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))
concurrent work
async def restart_host(msg):
...

async def save(msg):


...

async def consume(queue):


while True:
msg = await [Link]()
[Link](f'Pulled {msg}')

await save(msg)
await restart_host(msg)
block when needed
async def restart_host(msg):
...

async def save(msg):


...

async def consume(queue):


while True:
msg = await [Link]()
[Link](f'Pulled {msg}')

await save(msg)
last_restart = await last_restart_date(msg)
if today - last_restart > max_days:
await restart_host(msg)
block when needed
async def handle_message(msg):
await save(msg)
last_restart = await last_restart_date(msg)
if today - last_restart > max_days:
asyncio.create_task(restart_host(msg))

async def consume(queue):


while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
block when needed
async def handle_message(msg):
asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))

async def consume(queue):


while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
initial setup
finalization tasks
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")

async def handle_message(msg):


asyncio.create_task(save(msg))
asyncio.create_task(restart_host(msg))
unblocking: finalization tasks
def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")

async def handle_message(msg):


await save(msg)
await restart_host(msg)
cleanup(msg)
unblocking: finalization tasks
def cleanup(msg, fut):
[Link] = True
[Link](f"Done. Acked {msg}")

async def handle_message(msg):


g_future = [Link](save(msg), restart_host(msg))

callback = [Link](cleanup, msg)


g_future.add_done_callback(callback)
await g_future
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
[Link],250 INFO: Pulled Message(inst_name='cattle-zpsk')
[Link],286 INFO: Restarted [Link]
[Link],347 INFO: Pulled Message(inst_name='cattle-998c')
[Link],486 INFO: Saved Message(inst_name='cattle-zpsk') into database
[Link],486 INFO: Done. Acked Message(inst_name='cattle-zpsk')
[Link],811 INFO: Pulled Message(inst_name='cattle-j9bu')
[Link],863 INFO: Saved Message(inst_name='cattle-998c') into database
[Link],903 INFO: Pulled Message(inst_name='cattle-vk5l')
[Link],149 INFO: Pulled Message(inst_name='cattle-1lf2')
[Link],239 INFO: Restarted [Link]
[Link],245 INFO: Restarted [Link]
[Link],245 INFO: Done. Acked Message(inst_name='cattle-998c')
[Link],267 INFO: Saved Message(inst_name='cattle-j9bu') into database
[Link],478 INFO: Pulled Message(inst_name='cattle-mflk')
[Link],481 INFO: Restarted [Link]
[Link],482 INFO: Done. Acked Message(inst_name='cattle-j9bu')
[Link],505 INFO: Pulled Message(inst_name='cattle-t7tv')
unblocking: finalization tasks
def cleanup(msg, fut):
[Link] = True
[Link](f"Done. Acked {msg}")

async def handle_message(msg):


g_future = [Link](save(msg), restart_host(msg))

callback = [Link](cleanup, msg)


g_future.add_done_callback(callback)
await g_future
unblocking: finalization tasks
async def cleanup(msg):
[Link] = True
[Link](f"Done. Acked {msg}")

async def handle_message(msg):


await [Link](save(msg), restart_host(msg))
await cleanup(msg)
adding concurrency: tl;dr

• Asynchronous != concurrent
• Serial != blocking
graceful shutdowns
graceful shutdowns
responding to signals
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt: # <-- a.k.a. SIGINT
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
$ python [Link]
$ pkill -INT -f "python [Link]"

[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')


[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],144 INFO: Process interrupted
[Link],144 INFO: Cleaning up
responding to signals
$ python [Link]
$ pkill -INT -f "python [Link]"

[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')


[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],144 INFO: Process interrupted
[Link],144 INFO: Cleaning up
responding to signals
$ python [Link]
$ pkill -TERM -f "python [Link]"

[Link],553 INFO: Pulled Message(inst_name='cattle-npww')


[Link],554 INFO: Done. Acked Message(inst_name='cattle-npww')
[Link],655 INFO: Pulled Message(inst_name='cattle-rm7n')
[Link],655 INFO: Done. Acked Message(inst_name='cattle-rm7n')
[Link],790 INFO: Saved Message(inst_name='cattle-rm7n') into database
[Link],831 INFO: Saved Message(inst_name='cattle-npww') into database
[1] 78851 terminated python mandrill/[Link]
responding to signals
$ python [Link]
$ pkill -TERM -f "python [Link]"

[Link],553 INFO: Pulled Message(inst_name='cattle-npww')


[Link],554 INFO: Done. Acked Message(inst_name='cattle-npww')
[Link],655 INFO: Pulled Message(inst_name='cattle-rm7n')
[Link],655 INFO: Done. Acked Message(inst_name='cattle-rm7n')
[Link],790 INFO: Saved Message(inst_name='cattle-rm7n') into database
[Link],831 INFO: Saved Message(inst_name='cattle-npww') into database
[1] 78851 terminated python mandrill/[Link]
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
responding to signals
def main():
queue = [Link]()
loop = asyncio.get_event_loop() # <-- could happen here or earlier

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted") # <-- could happen here
finally:
[Link]("Cleaning up") # <-- could happen here
[Link]() # <-- could happen here
graceful shutdowns
signal handler
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]

[Link](f"Cancelling {len(tasks)} outstanding tasks")


await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics”)
[Link]()
[Link]("Shutdown complete.")
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]

[Link](f"Cancelling {len(tasks)} outstanding tasks")


await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics”)
[Link]()
[Link]("Shutdown complete.")
signal handler
async def shutdown(signal, loop):
[Link](f"Received exit signal {[Link]}...")
[Link]("Closing database connections")
[Link]("Nacking outstanding messages")
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[[Link]() for task in tasks]

[Link](f"Cancelling {len(tasks)} outstanding tasks")


await [Link](*tasks, return_exceptions=True)
[Link]("Flushing metrics")
[Link]()
[Link]("Shutdown complete.")
signal handler
def main():
queue = [Link]()
loop = asyncio.get_event_loop()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
except KeyboardInterrupt:
[Link]("Process interrupted")
finally:
[Link]("Cleaning up")
[Link]()
signal handler
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
queue = [Link]()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
signal handler
def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
queue = [Link]()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
signal handler
$ python [Link]
# or -HUP or -INT
$ pkill -TERM -f "python [Link]"

[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')


[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],143 INFO: Received exit signal SIGTERM...
[Link],143 INFO: Closing database connections
[Link],144 INFO: Cancelling 19 outstanding tasks
[Link],144 INFO: Flushing metrics
[Link],145 INFO: Cleaning up
signal handler
$ python [Link]
# or -HUP or -INT
$ pkill -TERM -f "python [Link]"

[Link],321 INFO: Pulled Message(inst_name='cattle-lrnm')


[Link],321 INFO: Done. Acked Message(inst_name='cattle-lrnm')
[Link],700 INFO: Pulled Message(inst_name='cattle-m0f6')
[Link],700 INFO: Done. Acked Message(inst_name='cattle-m0f6')
[Link],740 INFO: Saved Message(inst_name='cattle-m0f6') into database
[Link],840 INFO: Saved Message(inst_name='cattle-lrnm') into database
[Link],143 INFO: Received exit signal SIGTERM...
[Link],143 INFO: Closing database connections
[Link],144 INFO: Cancelling 19 outstanding tasks
[Link],144 INFO: Flushing metrics
[Link],145 INFO: Cleaning up
graceful shutdowns
which signals to care about
which signals to care about

Hard Exit Graceful Reload/Restart

nginx TERM, INT QUIT HUP

Apache TERM WINCH HUP

uWSGI INT, QUIT HUP, TERM

Gunicorn INT, QUIT TERM HUP

Docker KILL TERM


graceful shutdowns
not-so-graceful [Link]
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")
ungraceful shutdown: [Link]
async def cant_stop_me():
...

def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

shielded_coro = [Link](cant_stop_me())
try:
loop.run_until_complete(shielded_coro)
finally:
[Link]("Cleaning up")
[Link]()
ungraceful shutdown: [Link]
async def cant_stop_me():
...

def main():
loop = asyncio.get_event_loop()
signals = ([Link], [Link], [Link])
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

shielded_coro = [Link](cant_stop_me())
try:
loop.run_until_complete(shielded_coro)
finally:
[Link]("Cleaning up")
[Link]()
ungraceful shutdown: [Link]
[Link],105 INFO: Hold on...
^[Link],156 INFO: Received exit signal SIGINT...
[Link],156 INFO: Cancelling 2 outstanding tasks
[Link],156 INFO: Coroutine cancelled
[Link],157 INFO: Cleaning up
Traceback (most recent call last):
File "examples/shield_test.py", line 62, in
loop.run_until_complete(shielded_coro)
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.p
return [Link]()
[Link]._base.CancelledError
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")

async def main():


await [Link](cant_stop_me())

[Link](main())
ungraceful shutdown: [Link]
[Link],587 INFO: Hold on...
^[Link],982 INFO: Cleaning up
Traceback (most recent call last):
File "shield_test_no_shutdown.py", line 23, in <module>
loop.run_until_complete(shielded_coro)
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
self.run_forever()
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
self._run_once()
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/asyncio/base_events.p
event_list = self._selector.select(timeout)
File "/Users/lynn/.pyenv/versions/3.6.2/lib/python3.6/[Link]", line 5
kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")

async def imma_let_you_speak(task_to_cancel):


await [Link](2)
[Link](f"interrupting {task_to_cancel}”)
task_to_cancel.cancel()

async def main():


shielded = [Link](cant_stop_me())
cancel_coro = imma_let_you_speak(shielded)
await [Link](shielded, cancel_coro)

[Link](main())
ungraceful shutdown: [Link]
async def cant_stop_me():
[Link]("Hold on...")
await [Link](60)
[Link]("Done!")

async def imma_let_you_speak(task_to_cancel):


await [Link](2)
[Link](f"interrupting {task_to_cancel}”)
task_to_cancel.cancel()

async def main():


shielded = [Link](cant_stop_me())
cancel_coro = imma_let_you_speak(shielded)
await [Link](shielded, cancel_coro)

[Link](main())
ungraceful shutdown: [Link]
[Link],729 INFO: Hold on...
[Link],730 INFO: killing <Future pending cb=[gather.<locals>._done_callback
python3.7/asyncio/[Link]]>
Traceback (most recent call last):
File "shield_test_no_shutdown.py", line 38, in <module>
[Link](main())
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
return loop.run_until_complete(main)
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.p
return [Link]()
[Link]._base.CancelledError
graceful shutdown: tl;dr

• try/except/finally isn’t enough


• Define desired shutdown behavior
• Use signal handlers
• Listen for appropriate signals
exception handling
exception handling
global handler
global handler
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Restarted {[Link]}")
global handler
async def restart_host(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
rand_int = [Link](1, 5)
if rand_int == 2:
raise Exception(f"Could not restart {[Link]}!")

[Link] = True
[Link](f"Restarted {[Link]}")
global handler
$ python [Link]
global handler
$ python [Link]

[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')


[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
$ python [Link]

[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')


[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
$ python [Link]

[Link],524 INFO: Pulled Message(inst_name='cattle-hvy0')


[Link],924 INFO: Pulled Message(inst_name='cattle-5i2f')
[Link],925 ERROR: Task exception was never retrieved
future: <Task finished coro=<restart_host() done, defined at
mayhem_ex_handling.py:56> exception=Exception('Could not restart
[Link]')>
Traceback (most recent call last):
File "mayhem_ex_handling.py", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
[Link],247 INFO: Pulled Message(inst_name='cattle-e086')
[Link],432 INFO: Saved Message(inst_name='cattle-hvy0') into database
[Link],517 INFO: Restarted [Link]
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")

def main():
loop = asyncio.get_event_loop()
# <-- snip —>
loop.set_exception_handler(exception_handler)
# <-- snip -->
global handler
def exception_handler(loop, context):
[Link](f"Caught exception: {context['exception']}")

def main():
loop = asyncio.get_event_loop()
# <-- snip —>
loop.set_exception_handler(exception_handler)
# <-- snip -->
global handler
$ python [Link]
global handler
$ python [Link]

[Link],187 INFO: Pulled Message(instance_name='cattle-i490')


[Link],192 INFO: Restarted [Link]
[Link],241 INFO: Pulled Message(instance_name='cattle-31is')
[Link],331 INFO: Saved Message(instance_name='cattle-31is') into database
[Link],535 INFO: Pulled Message(instance_name='cattle-sx7f')
[Link],535 ERROR: Caught exception: Could not restart [Link].n
[Link],730 INFO: Pulled Message(instance_name='cattle-hsh9')
[Link],731 INFO: Saved Message(instance_name='cattle-sx7f') into database
[Link],759 INFO: Pulled Message(instance_name='cattle-g20p')
[Link],800 INFO: Restarted [Link]
[Link],26 INFO: Saved Message(instance_name='cattle-i490') into database
[Link],45 INFO: Saved Message(instance_name='cattle-hsh9') into database
[Link],181 INFO: Saved Message(instance_name='cattle-g20p') into database
[Link],194 INFO: Restarted [Link]
global handler
$ python [Link]

[Link],187 INFO: Pulled Message(instance_name='cattle-i490')


[Link],192 INFO: Restarted [Link]
[Link],241 INFO: Pulled Message(instance_name='cattle-31is')
[Link],331 INFO: Saved Message(instance_name='cattle-31is') into database
[Link],535 INFO: Pulled Message(instance_name='cattle-sx7f')
[Link],535 ERROR: Caught exception: Could not restart cattle-sx7f
[Link],730 INFO: Pulled Message(instance_name='cattle-hsh9')
[Link],731 INFO: Saved Message(instance_name='cattle-sx7f') into database
[Link],759 INFO: Pulled Message(instance_name='cattle-g20p')
[Link],800 INFO: Restarted [Link]
[Link],26 INFO: Saved Message(instance_name='cattle-i490') into database
[Link],45 INFO: Saved Message(instance_name='cattle-hsh9') into database
[Link],181 INFO: Saved Message(instance_name='cattle-g20p') into database
[Link],194 INFO: Restarted [Link]
exception handling
specific handlers
specific handlers
async def handle_message(msg):
await [Link](save(msg), restart_host(msg))
await cleanup(msg)
specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)

to_ack = True
if isinstance(restarted, Exception):
to_ack = False

await cleanup(msg, to_ack)


specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)

to_ack = True
if isinstance(restarted, Exception):
to_ack = False

await cleanup(msg, to_ack)


specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)

to_ack = True
if isinstance(restarted, Exception):
to_ack = False

await cleanup(msg, to_ack)


specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)

to_ack = True
if isinstance(restarted, Exception):
to_ack = False

await cleanup(msg, to_ack)


specific handlers
async def handle_message(msg):
saved, restarted = await [Link](
save(msg), restart_host(msg), return_exceptions=True)

to_ack = True
if isinstance(restarted, Exception):
to_ack = False

await cleanup(msg, to_ack)


exception handling

• global exception handling: loop.set_exception_handler


• individual exception handling: [Link] with 

return_exceptions=True
threads and asyncio
threads and asyncio
running coroutines from other threads
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)

def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)

def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)

def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)

def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
[Link],833 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],833 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 115, in callback
asyncio.create_task(handle_message(data))
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
line 320, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
running coroutines from other threads
[Link],833 INFO: Pulled Message(inst_name='cattle-hvy0')
[Link],833 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 115, in callback
asyncio.create_task(handle_message(data))
File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/[Link]",
line 320, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
running coroutines from other threads
def threaded_consume():
threaded_pubsub_client.subscribe(TOPIC, handle_message_sync)

def handle_message_sync(msg):
msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)

def handle_message_sync(loop, msg):


msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)

def handle_message_sync(loop, msg):


msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
[Link],543 INFO: Pulled Message(inst_name='xbci')
[Link],543 INFO: Pulled Message(inst_name='e8x5')
[Link],544 INFO: Running something else
[Link],721 INFO: Saved Message(inst_name='e8x5') into database
[Link],828 INFO: Saved Message(inst_name='xbci') into database
[Link],828 ERROR: Caught exception: Could not restart [Link]
[Link],549 INFO: Restarted [Link]
[Link],821 INFO: Done. Message(inst_name='e8x5')
[Link],108 INFO: Running something else
[Link],276 INFO: Done. Message(inst_name='xbci')
threads and asyncio
running coroutines from other threads
ke 2
ta
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)

def handle_message_sync(loop, msg):


msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
loop.create_task(handle_message(msg))

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
running coroutines from other threads
def threaded_consume(loop):
callback = [Link](handle_message_sync, loop)
threaded_pubsub_client.subscribe(TOPIC, callback)

def handle_message_sync(loop, msg):


msg = Message(**msg.json_data)
[Link](f"Pulled {msg}")
asyncio.run_coroutine_threadsafe(handle_message(data), loop)

async def run():


loop = asyncio.get_running_loop()
executor = [Link]()
await loop.run_in_executor(executor, threaded_consume, loop)
threads and asyncio

• ThreadPoolExecutor: calling threaded code from the 



main event loop
• asyncio.run_coroutine_threadsafe: running a 

coroutine on the main event loop from another thread

testing asyncio code
testing asyncio code
simple testing with pytest
simple testing with pytest
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

def test_save(message):
assert not [Link] # sanity check
[Link]([Link](message))
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

def test_save(message):
assert not [Link] # sanity check
[Link]([Link](message))
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

def test_save(message):
assert not [Link] # sanity check
loop = asyncio.get_event_loop()
loop.run_until_complete([Link](message))
[Link]()
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

def test_save(message):
assert not [Link] # sanity check
loop = asyncio.get_event_loop()
loop.run_until_complete([Link](message))
[Link]()
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
simple testing with pytest
@[Link]
def message():
return [Link](msg_id="1234", instance_name="mayhem_test")

@[Link]
async def test_save(message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
testing asyncio code
mocking coroutines
mocking coroutines
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
mocking coroutines
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()

async def _coro(*args, **kwargs):


return mock(*args, **kwargs)

if to_patch: # <-- may not need/want to patch anything


[Link](to_patch, _coro)
return mock, _coro

return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()

async def _coro(*args, **kwargs):


return mock(*args, **kwargs)

if to_patch: # <-- may not need/want to patch anything


[Link](to_patch, _coro)
return mock, _coro

return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()

async def _coro(*args, **kwargs):


return mock(*args, **kwargs)

if to_patch: # <-- may not need/want to patch anything


[Link](to_patch, _coro)
return mock, _coro

return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()

async def _coro(*args, **kwargs):


return mock(*args, **kwargs)

if to_patch: # <-- may not need/want to patch anything


[Link](to_patch, _coro)
return mock, _coro

return _create_mock_patch_coro
mocking coroutines
@[Link]
def create_coro_mock(mocker, monkeypatch):
def _create_mock_patch_coro(to_patch=None):
mock = [Link]()

async def _coro(*args, **kwargs):


return mock(*args, **kwargs)

if to_patch: # <-- may not need/want to patch anything


[Link](to_patch, _coro)
return mock, _coro

return _create_mock_patch_coro
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock

@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock

@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
mocking coroutines
@[Link]
def mock_sleep(create_coro_mock):
mock, _ = create_coro_mock("[Link]")
return mock

@[Link]
async def test_save(mock_sleep, message):
assert not [Link] # sanity check
await [Link](message)
assert [Link]
assert 1 == mock_sleep.call_count
testing asyncio code
testing create_task
testing create_task
async def consume(queue):
while True:
msg = await [Link]()
[Link](f"Pulled {msg}")
asyncio.create_task(handle_message(msg))
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value

@[Link]
def mock_get(mock_queue, create_coro_mock):
mock, coro = create_coro_mock()
mock_queue.get = coro
return mock
testing create_task
@[Link]
def mock_queue(mocker, monkeypatch):
queue = [Link]()
[Link]([Link], "Queue", queue)
return queue.return_value

@[Link]
def mock_get(mock_queue, create_coro_mock):
mock, coro = create_coro_mock()
mock_queue.get = coro
return mock
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mayhem.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

mock_handle_message.assert_called_once_with(message)
testing create_task
============================= FAILURES =============================
___________________________ test_consume ___________________________

<--snip-->
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_get.side_effect = [message, Exception("break while loop")]
mock_handle_message = create_coro_mock("mayhem.handle_message")

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

> mock_handle_message.assert_called_once_with(message)
E AssertionError: Expected 'mock' to be called once. Called 0 times.

test_mayhem.py:230: AssertionError
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock("mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check

await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock(“mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check

await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing create_task
@[Link]
async def test_consume(mock_get, mock_queue, message, create_coro_mock):
mock_handle_message, _ = create_coro_mock(“mandrill.handle_message")
mock_get.side_effect = [message, Exception("break while loop")]

with [Link](Exception, match="break while loop"):


await [Link](mock_queue)

ret_tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
assert 1 == len(ret_tasks)
mock_handle_message.assert_not_called() # <-- sanity check

await [Link](*ret_tasks)
mock_handle_message.assert_called_once_with(message)
testing asyncio code
testing the event loop
testing the event loop
def main():
loop = asyncio.get_event_loop()
for s in ([Link], [Link], [Link]):
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop))
)
loop.set_exception_handler(exception_handler)
queue = [Link]()

try:
loop.create_task(publish(queue))
loop.create_task(consume(queue))
loop.run_forever()
finally:
[Link]("Cleaning up")
[Link]()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()

yield new_loop

new_loop._close()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()

yield new_loop

new_loop._close()
testing the event loop
@[Link]
def event_loop(event_loop, mocker):
new_loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop._close = new_loop.close
new_loop.close = [Link]()

yield new_loop

new_loop._close()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()
# <--snip-->
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()

assert [Link] in event_loop._signal_handlers


assert mayhem.handle_exception == event_loop.get_exception_handler()

mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()

# asserting the loop is stopped but not closed


assert not event_loop.is_running()
assert not event_loop.is_closed()

event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()

assert [Link] in event_loop._signal_handlers


assert mayhem.handle_exception == event_loop.get_exception_handler()

mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()

# asserting the loop is stopped but not closed


assert not event_loop.is_running()
assert not event_loop.is_closed()

event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
# <--snip-->
[Link]()

assert [Link] in event_loop._signal_handlers


assert mayhem.handle_exception == event_loop.get_exception_handler()

mock_consume.assert_called_once_with(mock_queue)
mock_publish.assert_called_once_with(mock_queue)
mock_shutdown_gather.assert_called_once_with()

# asserting the loop is stopped but not closed


assert not event_loop.is_running()
assert not event_loop.is_closed()

event_loop.close.assert_called_once_with()
testing the event loop
def test_main(create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), [Link])

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()
# <--snip-->
testing the event loop
@[Link]("sig_to_test", ("SIGINT", "SIGTERM", "SIGHUP"))
def test_main(sig_to_test, create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), sig_to_test)

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()

assert sig_to_test in event_loop._signal_handlers


# <--snip-->
testing the event loop
@[Link]("sig_to_test", ("SIGINT", "SIGTERM", "SIGHUP"))
def test_main(sig_to_test, create_mock_coro, event_loop, mock_queue):
mock_consume, _ = create_mock_coro("[Link]")
mock_publish, _ = create_mock_coro("[Link]")
mock_shutdown_gather, _ = create_mock_coro("[Link]")

def _send_signal():
[Link](0.1)
[Link]([Link](), sig_to_test)

thread = [Link](target=_send_signal, daemon=True)


[Link]()

[Link]()

assert sig_to_test in event_loop._signal_handlers


# <--snip-->
testing asyncio code

• pytest-asyncio + mocked coroutines


• asynctest for the former Java developers stdlib’s 

unittest
debugging asyncio code
debugging asyncio code
manual debugging
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack() for t in tasks]
await [Link](2)
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack() for t in tasks]
await [Link](2)
manual debugging
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<cleanup() running at [Link]> wait_for=<Fu
File "[Link]", line 78, in cleanup
await [Link]([Link]())
Stack for <Task pending coro=<consume() running at [Link]> wait_for=<F
File "[Link]", line 115, in consume
msg = await [Link]()
Stack for <Task pending coro=<restart_host() running at [Link]> wait_fo
File "[Link]", line 62, in restart_host
await [Link]([Link](1, 3))
manual debugging
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<handle_message() running at [Link]> wait
File "[Link]", line 107, in handle_message
save_coro, restart_coro, return_exceptions=True
Stack for <Task pending coro=<cleanup() running at [Link]> wait_for=<Fu
File "[Link]", line 78, in cleanup
await [Link]([Link]())
Stack for <Task pending coro=<consume() running at [Link]> wait_for=<F
File "[Link]", line 115, in consume
msg = await [Link]()
Stack for <Task pending coro=<restart_host() running at [Link]> wait_fo
File "[Link]", line 62, in restart_host
await [Link]([Link](1, 3))
manual debugging
async def monitor_tasks():
while True:
tasks = [
t for t in asyncio.all_tasks()
if t is not asyncio.current_task()
]
[t.print_stack(limit=5) for t in tasks]
await [Link](2)
debugging asyncio code
using debug mode
using debug mode: traceback context
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished coro=<handle_message() done, defined at
[Link]> exception=Exception('Could not restart cattle-
[Link]') created at /Users/lynn/.pyenv/versions/3.7.2/lib/
python3.7/asyncio/[Link]>
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished ...>
source_traceback: Object created at (most recent call last):
File "[Link]", line 164, in <module>
main()
File "[Link]", line 157, in main
loop.run_forever()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.p
self._run_once()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.p
handle._run()
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/[Link]", l
self._context.run(self._callback, *self._args)
File "[Link]", line 117, in consume
asyncio.create_task(handle_message(msg))
File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/[Link]", li
return loop.create_task(coro)
using debug mode: traceback context
[Link],830 ERROR: Task exception was never retrieved
future: <Task finished ...>
source_traceback: Object created at ...
Traceback (most recent call last):
File "[Link]", line 107, in handle_message
save_coro, restart_coro
File "[Link]", line 60, in restart_host
raise Exception(f"Could not restart {[Link]}")
Exception: Could not restart [Link]
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]


[Link],954 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/pycon19/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 174, in callback
loop.create_task(handle_message(pubsub_msg))
# <-- snip -->
RuntimeError: Non-thread-safe operation invoked on an event loop other
than the current one
using debug mode: thread safety
$ PYTHONASYNCIODEBUG=1 python [Link]


[Link],954 ERROR: Top-level exception occurred in callback while
processing a message
Traceback (most recent call last):
File "/Users/lynn/.pyenv/versions/pycon19/lib/python3.7/site-packages/
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py",
line 63, in _wrap_callback_errors
callback(message)
File "[Link]", line 174, in callback
loop.create_task(handle_message(pubsub_msg))
# <-- snip -->
RuntimeError: Non-thread-safe operation invoked on an event loop other
than the current one
using debug mode: slow coros
async def save(msg):
[Link](1 + [Link]())
[Link] = True
[Link](f"Saved {msg} into database")
using debug mode: slow coros
async def save(msg):
[Link](1 + [Link]())
[Link] = True
[Link](f"Saved {msg} into database")
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]


[Link],781 INFO: Pulled Message(inst_name='cattle-okxa')
[Link],782 INFO: Extended deadline 3s for Message(inst_name='cattle-okxa')
[Link],416 INFO: Saved Message(inst_name='cattle-okxa') into database
[Link],417 WARNING: Executing <Task finished coro=<save() done, defined at
[Link]> result=None
created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/
[Link]> took 1.634 seconds
[Link],418 INFO: Pulled Message(instance_name='cattle-pmbv')
using debug mode: slow coros
$ PYTHONASYNCIODEBUG=1 python [Link]


[Link],781 INFO: Pulled Message(inst_name='cattle-okxa')
[Link],782 INFO: Extended deadline 3s for Message(inst_name='cattle-okxa')
[Link],416 INFO: Saved Message(inst_name='cattle-okxa') into database
[Link],417 WARNING: Executing <Task finished coro=<save() done, defined at
[Link]> result=None
created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/
[Link]> took 1.634 seconds
[Link],418 INFO: Pulled Message(instance_name='cattle-pmbv')
using debug mode: slow coros
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
# float, in seconds
loop.slow_callback_duration = 0.5

# <-- snip -->


using debug mode: slow coros
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
# float, in seconds
loop.slow_callback_duration = 0.5

# <-- snip -->


debugging asyncio code
debugging in production
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
debugging in production
# <-- snip -->
from aiodebug import log_slow_callbacks
from aiodebug import monitor_loop_lag
# <-- snip -->
def main():
queue = [Link]()
loop = asyncio.get_event_loop()
log_slow_callbacks.enable(0.05)
monitor_loop_lag.enable(my_statsd_client)
debugging asyncio code

• manual: task.print_stack()
• proper: built-in debug mode
• f-it, we’ll do it live: aiodebug
profiling asyncio code
profiling asyncio code
cProfile
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
ncalls tottime percall ... filename:lineno(function)
134 4.785 0.036 ... {method 'control' of '[Link]' objects}
17 0.007 0.000 ... {built-in method _imp.create_dynamic}
62 0.007 0.000 ... {built-in method [Link]}
132 0.003 0.000 ... base_events.py:1679(_run_once)
217/216 0.003 0.000 ... {built-in method builtins.__build_class__}
361 0.003 0.000 ... {built-in method [Link]}
62 0.002 0.000 ... <frozen importlib._bootstrap_external>:914(ge
42 0.002 0.000 ... {built-in method [Link]}
195 0.001 0.000 ... <frozen importlib._bootstrap_external>:1356(f
50 0.001 0.000 ... {method 'write' of '_io.TextIOWrapper' object
122 0.001 0.000 ... _make.py:1217(__repr__)
50 0.001 0.000 ... __init__.py:293(__init__)
18 0.001 0.000 ... [Link](__new__)
72/15 0.001 0.000 ... sre_parse.py:475(_parse)
62 0.001 0.000 ... {method 'read' of '_io.FileIO' objects}
cProfile
$ timeout -s INT 5s python -m cProfile -s tottime [Link]
ncalls tottime percall ... filename:lineno(function)
134 4.785 0.036 ... {method 'control' of '[Link]' objects}
17 0.007 0.000 ... {built-in method _imp.create_dynamic}
62 0.007 0.000 ... {built-in method [Link]}
132 0.003 0.000 ... base_events.py:1679(_run_once)
217/216 0.003 0.000 ... {built-in method builtins.__build_class__}
361 0.003 0.000 ... {built-in method [Link]}
62 0.002 0.000 ... <frozen importlib._bootstrap_external>:914(ge
42 0.002 0.000 ... {built-in method [Link]}
195 0.001 0.000 ... <frozen importlib._bootstrap_external>:1356(f
50 0.001 0.000 ... {method 'write' of '_io.TextIOWrapper' object
122 0.001 0.000 ... _make.py:1217(__repr__)
50 0.001 0.000 ... __init__.py:293(__init__)
18 0.001 0.000 ... [Link](__new__)
72/15 0.001 0.000 ... sre_parse.py:475(_parse)
62 0.001 0.000 ... {method 'read' of '_io.FileIO' objects}
cProfile
$ timeout -s INT 5s python -m cProfile -s filename [Link]
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 4.704 4.704 [Link](main)
18 0.000 0.000 0.002 0.000 [Link](restart_host)
22 0.000 0.000 0.003 0.000 [Link](save)
33 0.000 0.000 0.003 0.000 [Link](cleanup)
22 0.000 0.000 0.002 0.000 [Link](extend)
11 0.000 0.000 0.000 0.000 [Link](handle_results)
12 0.000 0.000 0.002 0.000 [Link](publish)
22 0.000 0.000 0.002 0.000 [Link](handle_message)
12 0.000 0.000 0.003 0.000 [Link](consume)
11 0.000 0.000 0.000 0.000 [Link](__attrs_post_init_
1 0.000 0.000 0.000 0.000 [Link](Message)
1 0.000 0.000 0.000 0.000 [Link](<listcomp>)
2 0.000 0.000 0.001 0.000 [Link](shutdown)
1 0.000 0.000 0.000 0.000 [Link](<lambda>)
cProfile
$ timeout -s INT 5s python -m cProfile -s filename [Link]
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 4.704 4.704 [Link](main)
18 0.000 0.000 0.002 0.000 [Link](restart_host)
22 0.000 0.000 0.003 0.000 [Link](save)
33 0.000 0.000 0.003 0.000 [Link](cleanup)
22 0.000 0.000 0.002 0.000 [Link](extend)
11 0.000 0.000 0.000 0.000 [Link](handle_results)
12 0.000 0.000 0.002 0.000 [Link](publish)
22 0.000 0.000 0.002 0.000 [Link](handle_message)
12 0.000 0.000 0.003 0.000 [Link](consume)
11 0.000 0.000 0.000 0.000 [Link](__attrs_post_init_
1 0.000 0.000 0.000 0.000 [Link](Message)
1 0.000 0.000 0.000 0.000 [Link](<listcomp>)
2 0.000 0.000 0.001 0.000 [Link](shutdown)
1 0.000 0.000 0.000 0.000 [Link](<lambda>)
profiling asyncio code
cProfile with KCacheGrind
cProfile with (K|Q)CacheGrind
$ timeout -s INT 5s python -m cProfile -o [Link] [Link]
cProfile with (K|Q)CacheGrind
$ timeout -s INT 5s python -m cProfile -o [Link] [Link]


$ pyprof2calltree --kcachegrind -i [Link]
profiling asyncio code
line_profiler
line_profiler
@profile
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
line_profiler
@profile
async def save(msg):
# unhelpful simulation of i/o work
await [Link]([Link]())
[Link] = True
[Link](f"Saved {msg} into database")
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.002202 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.002202 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.002202 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 8 259.0 32.4 11.8 await [Link]([Link]
72 8 26.0 3.2 1.2 [Link] = True
73 8 1917.0 239.6 87.1 [Link](f"Saved {msg} into
line_profiler
import aiologger

logger = [Link].with_default_handlers()
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.0011 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.0011 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
line_profiler
$ timeout -s INT 5s kernprof -o [Link] --line-by-line [Link]
$ python -m line_profiler [Link]

Timer unit: 1e-06 s

Total time: 0.0011 s


File: [Link]
Function: save at line 69

Line # Hits Time Per Hit % Time Line Contents


=======================================================
69 @profile
70 async def save(msg):
71 7 269.0 38.4 24.5 await [Link]([Link]
72 5 23.0 4.6 2.1 [Link] = True
73 5 808.0 161.6 73.5 await [Link](f"Saved {msg}
profiling asyncio code
live profiling
live profiling
$ profiling live-profile --mono [Link]
profiling asyncio code

• No idea? Visualize with cProfile with 



pyprof2calltree + KCacheGrind
• Some idea? line_profiler
• f-it, we’ll do it live: profiling live-profiler
Thank you!
[Link]/adv-aio

Lynn Root | SRE | @roguelynn

You might also like