def test_receive_with_blocking_shutdown(
publisher_client, topic, subscription_async, capsys
):
_publish_messages(publisher_client, topic, message_num=3)
subscriber.receive_messages_with_blocking_shutdown(
PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0
)
out, _ = capsys.readouterr()
out_lines = out.splitlines()
msg_received_lines = [
i for i, line in enumerate(out_lines)
if re.search(r".*received.*message.*", line, flags=re.IGNORECASE)
]
msg_done_lines = [
i for i, line in enumerate(out_lines)
if re.search(r".*done processing.*message.*", line, flags=re.IGNORECASE)
]
stream_canceled_lines = [
i for i, line in enumerate(out_lines)
if re.search(r".*streaming pull future canceled.*", line, flags=re.IGNORECASE)
]
shutdown_done_waiting_lines = [
i for i, line in enumerate(out_lines)
if re.search(r".*done waiting.*stream shutdown.*", line, flags=re.IGNORECASE)
]
try:
assert "Listening" in out
assert subscription_async in out
assert len(stream_canceled_lines) == 1
assert len(shutdown_done_waiting_lines) == 1
Note: #441 was also for this test, but it was closed more than 10 days ago. So, I didn't mark it flaky.
commit: 2f180df
buildURL: Build Status, Sponge
status: failed
Test output