Add async Writer to support pub&mpub#25
Conversation
nsq/Writer.py
Outdated
There was a problem hiding this comment.
i dont think we need this for writing
There was a problem hiding this comment.
Thanks for adding the tests, this stray line is the last item
There was a problem hiding this comment.
-
conn.id = conn_id -
conn.last_recv_timestamp = time.time() -
conn.last_msg_timestamp = time.time()
Do you mean check_last_recv_timestamps etc?
There was a problem hiding this comment.
I meant last_msg_timestamp, it's not used here.
There was a problem hiding this comment.
Updated, please review it, thanks!
|
I had started writing some code in #11, can you port over the low-level tests I added there to this pull request (and I'll close #11). I think eventually (not now) this can benefit from some re-structuring for the shared code in Appreciate the contribution, nice work. |
|
Yes, there are some duplicated code now, I just want to implement this functional at first and I will do my best to do more contribution later. |
|
Looks good, you mind squashing down to two commits (one for JSON fix, one for everything else)? |
|
It's ok for me,thanks! |
|
Squashing is done, it's ready for merging, thanks! |
nsq/Writer.py
Outdated
There was a problem hiding this comment.
I just noticed this... conn.last_pub(1) should be conn.last_pub[1], but I have an alternative suggestion.
Even though we have async heartbeat messages that the Writer has to handle... every PUB or MPUB command can expect exactly one FRAME_TYPE_RESPONSE or FRAME_TYPE_ERROR _in the order they were sent_.
So, if instead we maintained a callback_queue instead of just a response_callback_queue, we can actually respond to each specific PUB or MPUB command rather than a generic "this failed callback" like you've currently implemented.
This callback can be optional, but the API would look like this:
def handler(self):
self.writer.pub("log", "Hello world", callback=self._finish_pub)
def _finish_pub(self, data):
if data.frame_type_id == nsq.FRAME_TYPE_ERROR:
logging.error("blablabla")what do you think?
nsq/Writer.py
Outdated
There was a problem hiding this comment.
rename the nsqds variable to nsqd_tcp_addresses for consistency with Reader
|
Updated code follow your comments, and I also renamed variable data to msg in pub&mpub func because it is a little confused with response's data. Squashed. |
nsq/Writer.py
Outdated
There was a problem hiding this comment.
should we update the example w/ the new (optional) API?
|
Refined callback and updated example. |
nsq/Writer.py
Outdated
There was a problem hiding this comment.
this should send response when frame == nsq.FRAME_TYPE_RESPONSE xor error when frame == nsq.FRAME_TYPE_ERROR, in the latter case perhaps as an exception we define?
|
Committed this change and squashed :) |
|
thanks |
Add async Writer to support pub&mpub
Add async Writer to support pub&mpub and fix JSONDecodeError