-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Pubsub subscriber #3637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub subscriber #3637
Conversation
| Args: | ||
| ack_id (str): The ack ID. | ||
| """ | ||
| request = types.StreamingPullRequest(ack_ids=[ack_id]) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| time as there are). | ||
| """ | ||
| return self._client.api.streaming_pull(request_generator, | ||
| options=gax.CallOptions(timeout=600), |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| time.sleep(snooze) | ||
| self.maintain_leases() | ||
|
|
||
| def modify_ack_deadline(self, ack_id, seconds): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pubsub/.flake8
Outdated
| *.pyc, | ||
| conf.py | ||
|
|
||
| ignore = |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| flow_control (~.pubsub_v1.types.FlowControl): The flow control | ||
| settings. Use this to prevent situations where you are | ||
| inundated with too many messages at once. | ||
| policy_class (class): A class that describes how to handle |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| ~.pubsub_v1.subscriber.consumer.base.BaseConsumer: An instance | ||
| of the defined ``consumer_class`` on the client. | ||
| """ | ||
| subscr = self._policy_class(self, subscription) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| import six | ||
|
|
||
| __all__ = ( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| are free to use a different formula. | ||
| The precision of data stored is to the nearest integer. Additionally, | ||
| values outside the range of ``10 <= x <= 600`` are stored as ``10`` or |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| publish_time (datetime): The time that this message was originally | ||
| published. | ||
| """ | ||
| def __init__(self, policy, ack_id, message): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # Actually start consuming messages. | ||
| self._consumer.start_consuming() | ||
|
|
||
| # Spawn a helper thread that maintains all of the leases for |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Policy(base.BasePolicy): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| @@ -0,0 +1,121 @@ | |||
| # Copyright 2017, Google Inc. All rights reserved. | |||
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| @@ -0,0 +1,155 @@ | |||
| # Copyright 2017, Google Inc. All rights reserved. | |||
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
It is still in nox.py in the cover session.
…ogle-cloud-python into pubsub-subscriber
| # This doesn't presently deal with exceptions that bubble up | ||
| # through the callback. If there is an error here, the thread will | ||
| # exit and no further queue items will be processed. We could | ||
| # potentially capture errors, log them, and then continue on. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| @@ -0,0 +1,266 @@ | |||
| # Copyright 2017, Google Inc. All rights reserved. | |||
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| abbv_data = abbv_data[0:50] + b'...' | ||
|
|
||
| # Return a useful representation. | ||
| answer = 'Message {\n' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| receive any given message more than once. | ||
| """ | ||
| time_to_ack = math.ceil(time.time() - self._received_timestamp) | ||
| self._request_queue.put(('ack', { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| snooze = random.uniform(0.0, p99 * 0.9) | ||
| logger.debug('Snoozing lease management for %f seconds.' % snooze) | ||
| time.sleep(snooze) | ||
| self.maintain_leases() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # If we do, we need to stop the stream. | ||
| if self._load >= 1.0: | ||
| self._paused = True | ||
| self.close() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| # In order to not thrash too much, require us to have passed below | ||
| # the resume threshold (80% by default) of each flow control setting | ||
| # before restarting. | ||
| if self._paused and self._load < self.flow_control.resume_threshold: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Merging this is blocked by CircleCI being hosed. Will try tomorrow morning. |
|
Nevermind -- Pub/Sub passed. It failed on Spanner. Merging now. |
Uses #3463 as a base.
This PR implements the Pub/Sub subscriber, which by default uses threading.
The following is yet to be implemented: