Skip to content

Conversation

@lukesneeringer
Copy link
Contributor

@lukesneeringer lukesneeringer commented Oct 26, 2017

This causes the .open method on subscriptions to return a future, which can be used to block the main thread or trap exceptions.

Fixes #3888.
Fixes #4019.

@jonparrott A couple items we should nail down and possibly tweak before merging this:

  • The current interface has a three-object pass to start consuming messages: instantiate a client, instantiate a subscription, then call .open on the subscription, which now returns a future. However, we currently have some shortcut logic that lets you pass a callback on step 2. You still get the subscription back, but .open is called implicitly; there's no good place for me to return the future in that case.
    • This is kind of a big deal; this future is basically the way to both block and handle errata.
    • On the other hand, maybe this is fine; .future is a public property.
      • At the very minimum, I should update all documentation to show the full baton pass and highlight the future and what it is for.
  • There is not really any good way to stop consuming automatically. I can not do it in on_exception because that runs in one of the threads that the helper_threads.stop_all() method is trying to burn down, so it raises RuntimeError.
    • I could have an .add_done_callback call in Future.__init__ that adds a callback that does the right thing, but this feels like an interface violation to me.
    • Maybe this is fine too; we (a) leave it up to users to decide how to handle errors, albeit (b) also fix bugs like Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234 so they are far less common.

Holding off on updating tests and docs until we make a decision.

This causes the `.open` method on subscriptions to return a future,
which can be used to block the main thread or trap exceptions.
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Oct 26, 2017
@theacodes
Copy link
Contributor

On the other hand, maybe this is fine; .future is a public property.

Seems fine:

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subcription = subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    subscription.future.result()

There is not really any good way to stop consuming automatically.

doesn't .close() cover this? Or am I missing something?

@lukesneeringer
Copy link
Contributor Author

doesn't .close() cover this? Or am I missing something?

It does, but if you call it in .on_exception, you get a RuntimeError because a thread tries to join itself.


# These objects are all communicated between threads; ensure that
# any writes to them are atomic.
# import pdb ; pdb.set_trace()

This comment was marked as spam.

This comment was marked as spam.

@theacodes
Copy link
Contributor

Can we have close() send a signal to exit if it's being invoked from the "wrong" thread?

@lukesneeringer
Copy link
Contributor Author

Maybe.

@lukesneeringer
Copy link
Contributor Author

@jonparrott This is ready for review. I am going to punt on .close() for now because I think it is more valuable to push this as is and spend cycles on a couple of the other bugs. I am also not 100% sure that .close() is even the right behavior.

@theacodes
Copy link
Contributor

@lukesneeringer totally fair

Copy link
Contributor

@theacodes theacodes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly fine, but some concerns about making the future public.

return True

def done(self):
"""Return True if the publish has completed, False otherwise.

This comment was marked as spam.

return self._result
raise err

def exception(self, timeout=None, _wait=1):

This comment was marked as spam.

This comment was marked as spam.

when the future finishes running.
"""
if self.done():
fn(self)

This comment was marked as spam.

"""Set the result of the future to the provided result.
Args:
result (str): The message ID.

This comment was marked as spam.

@@ -0,0 +1,176 @@
# Copyright 2017, Google Inc. All rights reserved.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

# The publishing-side subclass does not need any special behavior
# at this time.
#
# However, there is still a subclass so that if someone attempts

This comment was marked as spam.

@lukesneeringer lukesneeringer merged commit f383b8c into googleapis:master Oct 26, 2017
@lukesneeringer lukesneeringer deleted the subscribe-future branch October 26, 2017 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Publish future blocks indefinitely if API raises exception Pub/Sub has no way to track errors from the subscriber thread.

4 participants