-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add futures for subscriptions. #4265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add futures for subscriptions. #4265
Conversation
This causes the `.open` method on subscriptions to return a future, which can be used to block the main thread or trap exceptions.
Seems fine: subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
subcription = subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
subscription.future.result()
doesn't |
It does, but if you call it in |
|
|
||
| # These objects are all communicated between threads; ensure that | ||
| # any writes to them are atomic. | ||
| # import pdb ; pdb.set_trace() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Can we have close() send a signal to exit if it's being invoked from the "wrong" thread? |
|
Maybe. |
|
@jonparrott This is ready for review. I am going to punt on |
|
@lukesneeringer totally fair |
theacodes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly fine, but some concerns about making the future public.
| return True | ||
|
|
||
| def done(self): | ||
| """Return True if the publish has completed, False otherwise. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| return self._result | ||
| raise err | ||
|
|
||
| def exception(self, timeout=None, _wait=1): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| when the future finishes running. | ||
| """ | ||
| if self.done(): | ||
| fn(self) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| """Set the result of the future to the provided result. | ||
| Args: | ||
| result (str): The message ID. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| @@ -0,0 +1,176 @@ | |||
| # Copyright 2017, Google Inc. All rights reserved. | |||
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # The publishing-side subclass does not need any special behavior | ||
| # at this time. | ||
| # | ||
| # However, there is still a subclass so that if someone attempts |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This causes the
.openmethod on subscriptions to return a future, which can be used to block the main thread or trap exceptions.Fixes #3888.
Fixes #4019.
@jonparrott A couple items we should nail down and possibly tweak before merging this:
.openon the subscription, which now returns a future. However, we currently have some shortcut logic that lets you pass acallbackon step 2. You still get the subscription back, but.openis called implicitly; there's no good place for me to return the future in that case..futureis a public property.on_exceptionbecause that runs in one of the threads that thehelper_threads.stop_all()method is trying to burn down, so it raisesRuntimeError..add_done_callbackcall inFuture.__init__that adds a callback that does the right thing, but this feels like an interface violation to me.Holding off on updating tests and docs until we make a decision.