-
Notifications
You must be signed in to change notification settings - Fork 213
Publishing sometimes fails when ordering key is enabled #1084
Description
Environment details
- OS type and version: Debian Bookworm (Docker slim image)
- Python version: 3.9.18
- pip version: 23.1
- google-cloud-pubsub version: 2.18.4
- grpcio: 1.59.0
Description
Hello there,
We’ve encountered some publishing issues for a while now and ran out of ideas on how to fix them. The behaviour is the following. We publish messages into a few topics that are configured with an ordering key. Multiple times a week (or even a day), the publishing process fails and the client publisher stops (it seems to be stuck?), as no messages are published. On Google Cloud Monitoring, it usually states a ‘deadline exceeded’ and indeed, we do have timeouts when that happens (but no exception on the client side). This first observation is a bit weird though, given that we don’t publish massive loads of messages (just a few messages every second).
After (!) reading the documentation, this behaviour is supposed to be expected with ordering keys.
Code example
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True,
flow_control=PublishFlowControl(
message_limit=2000,
limit_exceeded_behavior=LimitExceededBehavior.BLOCK
),)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
batch_settings = BatchSettings(
1 * 1000 * 1000,
0.01,
1000,
)
publisher = pubsub_v1.PublisherClient(
batch_settings,
publisher_options=publisher_options,
client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
future.add_done_callback(resume_if_error)
def resume_if_error(future):
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")We wanted to avoid calling result() in the main thread as it is blocking, but use a callback instead. Anyway, this does not work, as we still have the same issues.
Can anyone help?