@pytest.mark.parametrize("transport", ["grpc", "rest"])
def test_subscriber_not_leaking_open_sockets(
publisher, topic_path_base, subscription_path_base, cleanup, transport
):
# Make sure the topic and the supscription get deleted.
# NOTE: Since subscriber client will be closed in the test, we should not
# use the shared `subscriber` fixture, but instead construct a new client
# in this test.
# Also, since the client will get closed, we need another subscriber client
# to clean up the subscription. We also need to make sure that auxiliary
# subscriber releases the sockets, too.
custom_str = "-not-leaking-open-sockets"
subscription_path = subscription_path_base + custom_str
topic_path = topic_path_base + custom_str
subscriber = pubsub_v1.SubscriberClient(transport="grpc")
subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc")
cleanup.append(
(subscriber_2.delete_subscription, (), {"subscription": subscription_path})
)
cleanup.append((subscriber_2.close, (), {}))
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
# Create topic before starting to track connection count (any sockets opened
# by the publisher client are not counted by this test).
publisher.create_topic(name=topic_path)
current_process = psutil.Process()
conn_count_start = len(current_process.connections())
# Publish a few messages, then synchronously pull them and check that
# no sockets are leaked.
with subscriber:
subscriber.create_subscription(name=subscription_path, topic=topic_path)
# Publish a few messages, wait for the publish to succeed.
publish_futures = [
publisher.publish(topic_path, "message {}".format(i).encode())
for i in range(1, 4)
]
for future in publish_futures:
future.result()
# Synchronously pull messages.
response = subscriber.pull(subscription=subscription_path, max_messages=3)
This test failed!
To configure my behavior, see the Flaky Bot documentation.
If I'm commenting on this issue too often, add the
flakybot: quietlabel andI will stop commenting.
commit: 4ec597c
buildURL: Build Status, Sponge
status: failed
Test output
publisher = topic_path_base = 'projects/precise-truck-742/topics/t-1683693650198' subscription_path_base = 'projects/precise-truck-742/subscriptions/s-1683693650201' cleanup = [(>, (), {'topic': 'projects/precise-truck-742/topics/t-1683693650198-not-leaking-open-sockets'})] transport = 'rest'
E assert 0 == 3
E + where 0 = len([])
E + where [] = .received_messages
tests/system.py:485: AssertionError