Rewrites RedisChannelLayer.receive specific channel section
I had a mystery bug where the channel_layer would stop receiving on specific channels after a bunch of receives got canceled.
Unable to reproduce the bug in clean conditions and unable to figure out what exactly was wrong, I just rewrote the suspect (RedisChannelLayer.receive).
My bug is gone and the tests still pass, so the pull request.
As a bonus, now simultaneously receiving on '!' channels with different prefixes should work (e.g. channel names produced by redisChannelLayer.new_channel(prefix='something')), witch was not the case before.
Hi @augonis. Interesting...
So, tests...
What’s our new buffer meant to offer that Queue doesn’t? Can we add some tests showing that in action?
The ReceiveBuffer is much more than a Queue, it manages waiters and buffers messages for all specific channels under the same 'real channel', as well as runs the receive loop for the 'real channel'.
It does have some Queue like bits in its implementation.
Only useful exposed are .loop and .get(channel), rest is implementation.
My goal was to simplify the control logic with the locks for the receiving as I suspect something there was behind the mystery bug.
Instead of locks I use a worker Task/loop that .single_receive(real_channel) that puts message in ReceiveBuffer that then either stores the massage for later or fulfills a getter Future.
I'll work on some tests, not familiar with pytest, but I'll learn.
I added tests for the receive task handling, as basic receive is covered by existing tests.
P.S. I've put a limit on how many messages per specific channel receive Buffer will hold (hardcoded 20) unlike the original .receive that would buffer unlimited messages. Any thoughts?
P.S. I've put a limit on how many messages per specific channel receive Buffer will hold (hardcoded 20) unlike the original .receive that would buffer unlimited messages. Any thoughts?
I wondering if we can tie this into capacity. (Then it's directly configurable...)
Overall this looks nice. It's my summer project to put together a set of decent releases for Daphne, Channels and here, so let's get this in. Thanks for the input! 🏅
Doable, but semantically and implementation wise it would muddy the meaning of capacity (and friends channel_capacity, get_capacity(channel)).
- Currently
capacityis configured/enforced on the sending side, having it affect receiving would be something new. -
capacityalso applies per real channel rather than per specific channel, so how it relates would have to be resolved
Yes... but it should be exposed, and does it merit another config option? (Not sure...)
Should I remove the limit to unblock this PR and open a separate issue?
Just read django/channels#134, sounds similar my problem, this PR might address it. django/channels#151 also may be related if that issue is only with specific channels.
What's the status with this?
It just needs some time to review properly. I have some issues around Channels' AsyncHttpConsumer at the top of my list, and then channels_redis after that.
self.receive_buffers is added to but never removed from. Memory leak. Probably want to dereference receive buffers that become empty.
There's also the previously-existing memory leak related to channels that are written to but never written from, in https://github.com/django/channels_redis/issues/384 , which I don't advocate fixing in this commit because it's already big enough.
I'd expect receive_buffers to be long lived and few in number.
In fact for all existing applications there would be just one, as receiving on multiple real_channels previously was not supported, though implied by the ability to specify prefix in redisChannelLayer.new_channel(prefix='something').