Skip to content

Conversation

@lukesneeringer
Copy link
Contributor

@lukesneeringer lukesneeringer commented Jul 20, 2017

Uses #3463 as a base.

This PR implements the Pub/Sub subscriber, which by default uses threading.

The following is yet to be implemented:

  • Automatic pause/resume based on flow control settings.
  • Tests of any kind (working on them in parallel with the Pub/Sub team's review)
  • API Documentation

@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Jul 20, 2017
Args:
ack_id (str): The ack ID.
"""
request = types.StreamingPullRequest(ack_ids=[ack_id])

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

time as there are).
"""
return self._client.api.streaming_pull(request_generator,
options=gax.CallOptions(timeout=600),

This comment was marked as spam.

This comment was marked as spam.

time.sleep(snooze)
self.maintain_leases()

def modify_ack_deadline(self, ack_id, seconds):

This comment was marked as spam.

pubsub/.flake8 Outdated
*.pyc,
conf.py

ignore =

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

flow_control (~.pubsub_v1.types.FlowControl): The flow control
settings. Use this to prevent situations where you are
inundated with too many messages at once.
policy_class (class): A class that describes how to handle

This comment was marked as spam.

~.pubsub_v1.subscriber.consumer.base.BaseConsumer: An instance
of the defined ``consumer_class`` on the client.
"""
subscr = self._policy_class(self, subscription)

This comment was marked as spam.

This comment was marked as spam.


import six

__all__ = (

This comment was marked as spam.

are free to use a different formula.
The precision of data stored is to the nearest integer. Additionally,
values outside the range of ``10 <= x <= 600`` are stored as ``10`` or

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

publish_time (datetime): The time that this message was originally
published.
"""
def __init__(self, policy, ack_id, message):

This comment was marked as spam.

# Actually start consuming messages.
self._consumer.start_consuming()

# Spawn a helper thread that maintains all of the leases for

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

logger = logging.getLogger(__name__)


class Policy(base.BasePolicy):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -0,0 +1,121 @@
# Copyright 2017, Google Inc. All rights reserved.

This comment was marked as spam.

@@ -0,0 +1,155 @@
# Copyright 2017, Google Inc. All rights reserved.

This comment was marked as spam.

This was referenced Aug 21, 2017
# This doesn't presently deal with exceptions that bubble up
# through the callback. If there is an error here, the thread will
# exit and no further queue items will be processed. We could
# potentially capture errors, log them, and then continue on.

This comment was marked as spam.

This comment was marked as spam.

@@ -0,0 +1,266 @@
# Copyright 2017, Google Inc. All rights reserved.

This comment was marked as spam.

This comment was marked as spam.

abbv_data = abbv_data[0:50] + b'...'

# Return a useful representation.
answer = 'Message {\n'

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

receive any given message more than once.
"""
time_to_ack = math.ceil(time.time() - self._received_timestamp)
self._request_queue.put(('ack', {

This comment was marked as spam.

snooze = random.uniform(0.0, p99 * 0.9)
logger.debug('Snoozing lease management for %f seconds.' % snooze)
time.sleep(snooze)
self.maintain_leases()

This comment was marked as spam.

# If we do, we need to stop the stream.
if self._load >= 1.0:
self._paused = True
self.close()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._paused and self._load < self.flow_control.resume_threshold:

This comment was marked as spam.

@lukesneeringer
Copy link
Contributor Author

Merging this is blocked by CircleCI being hosed. Will try tomorrow morning.

@lukesneeringer
Copy link
Contributor Author

Nevermind -- Pub/Sub passed. It failed on Spanner. Merging now.

@lukesneeringer lukesneeringer merged commit 6fe1309 into pubsub-publisher Aug 24, 2017
@lukesneeringer lukesneeringer deleted the pubsub-subscriber branch August 24, 2017 03:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants