Skip to content

PubSub: Unexpected behavior for 2 or more subscribes with the same scheduler #11

@LukasSlouka

Description

@LukasSlouka

Environment details

OS: Ubuntu 18.04.3 LTS
Python version: 3.7.4
google-cloud-pubsub version: 1.0.2

Steps to reproduce

  1. Set up a GCP project with enabled Pub/Sub and set up local environment so that the scripts can connect to it.
  2. Run server.py (in code examples below). It should create 2 topics (topic_a and topic_b) and subscriptions to them and start listening to those subscriptions. Notice that scheduler is passed as an argument to receive messages (documentation does not indicate that this should cause any troubles)
  3. Open a new terminal window and run python3.7 client.py topic_a & and python3.7 client.py topic_b &. This should start sending messages to both topics. (code in the examples below)
  4. The server will output some details about all published messages on both topics. Look at the unacked messages on both subscriptions in the GCP console or stack driver. You will see that some messages are still in the pubsub (which ones is quite random). If you try to pull them, nothing is returned.
  5. Terminate the running server.
  6. Pull messages from either subscription - you should be able to retrieve all of them. Notice that these messages have been processed by the server, but it is as if the ack did nothing.
  7. If you restart the server, it will process some of the messages again and some may even be acked out of the subscription.

Whole time there are no error logs or exceptions thrown, messages are just stuck in the subscription even after being acked. If you change the server code to instantiate thread scheduler inside of the receive_messages function, everything works as expected, therefore the shared scheduler is the problem.

Code example

server.py

import concurrent.futures.thread
import os
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler


def create_subscription(project_id, topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path)

    print('Subscription created: {}'.format(subscription))


def receive_messages(project_id, subscription_name, t_scheduler):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
    print('Listening for messages on {}'.format(subscription_path))


project_id = os.getenv("PUBSUB_PROJECT_ID")

publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)

# Create both topics
try:
    topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
    if 'topic_a' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
    if 'topic_b' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
    print('Topics already exists')

# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)

try:
    subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
    if 'topic_a_sub' not in subs:
        create_subscription(project_id, 'topic_a', 'topic_a_sub')
    if 'topic_b_sub' not in subs:
        create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
    print('Subscriptions already exists')

scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))

receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)

while True:
    time.sleep(60)

client.py

import datetime
import os
import random
import sys
from time import sleep

from google.cloud import pubsub_v1


def publish_messages(pid, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(pid, topic_name)

    for n in range(1, 10):
        data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)
        sleep(random.randint(10, 50) / 10.0)


project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])

I have created the stack overflow thread first, then did my own testing.

I would expect that using the same scheduler would not cause any difference in behavior or at the very least some exceptions or warnings to be raised that would indicate that something is wrong.

Also, it might be a good idea to warn people not to use the same scheduler for multiple subscriptions (even to different topics) somewhere in the documentation of the library. Due to the nature of the issue (no obvious indication that something is wrong), it might get to production quite easily.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.type: docsImprovement to the documentation for an API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions