Skip to content

Commit cd9e22d

Browse files
authored
Merge pull request #2097 from gevent/issue1957
Mnkey-patching the ``queue`` module (done by default in ``patch_all``) now patches ``Queue``, ``PriorityQueue``, and ``LifoQueue`
2 parents 9fffd6a + a66c079 commit cd9e22d

6 files changed

Lines changed: 368 additions & 277 deletions

File tree

docs/changes/1957.bugfix.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Monkey-patching the ``queue`` module (done by default in
2+
``patch_all``) now patches ``Queue``, ``PriorityQueue``, and
3+
``LifoQueue``. In addition to the general benefits of making all those
4+
classes cooperative, this is known to solve a non-deterministic
5+
deadlock with ``urllib3``.
6+
7+
In addition, ``Queue`` was renamed to ``SimpleQueue``; previously
8+
``SimpleQueue`` was an alias for the undocumented
9+
``queue._PySimpleQueue``. This makes ``SimpleQueue`` cooperative even
10+
without monkey-patching.
11+
12+
Likewise, ``JoinableQueue`` was renamed to ``Queue``, providing the
13+
``join`` method to all ``Queue`` objects, thus matching the standard
14+
library. The old name remains for backwards compatibility.

src/gevent/_gevent_cqueue.pxd

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ cdef inline void greenlet_init():
3636
@cython.final
3737
cdef _safe_remove(deq, item)
3838

39-
cdef class Queue:
39+
cdef class SimpleQueue:
4040
cdef __weakref__
4141
cdef readonly hub
4242
cdef readonly queue
@@ -66,9 +66,18 @@ cdef class Queue:
6666
cpdef get_nowait(self)
6767
cpdef peek(self, block=*, timeout=*)
6868
cpdef peek_nowait(self)
69-
cpdef shutdown(self, immediate=*)
69+
7070

7171
cdef _schedule_unlock(self)
72+
73+
74+
cdef class Queue(SimpleQueue):
75+
cdef Event _cond
76+
cdef readonly int unfinished_tasks
77+
cdef _did_put_task(self)
78+
79+
80+
cpdef shutdown(self, immediate=*)
7281
cdef _drain_for_immediate_shutdown(self)
7382

7483

@@ -78,27 +87,14 @@ cdef class ItemWaiter(Waiter):
7887
cdef readonly item
7988
cdef readonly Queue queue
8089

81-
###
82-
# XXX: Disabling Cython.final here pending a release > Cython 3.0.11
83-
# because it breaks on GCC. See https://github.com/gevent/gevent/issues/2049#issuecomment-2404700280
84-
# Restore when new cython is released.
85-
#
86-
# @cython.final
87-
###
90+
8891
cdef class UnboundQueue(Queue):
8992
pass
9093

9194
cdef class PriorityQueue(Queue):
9295
pass
9396

94-
95-
cdef class JoinableQueue(Queue):
96-
cdef Event _cond
97-
cdef readonly int unfinished_tasks
98-
cdef _did_put_task(self)
99-
100-
101-
cdef class LifoQueue(JoinableQueue):
97+
cdef class LifoQueue(Queue):
10298
pass
10399

104100
cdef class Channel:

src/gevent/monkey/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,20 @@ def patch_queue():
265265
"""
266266
Patch objects in :mod:`queue`.
267267
268-
269-
Currently, this just replaces :class:`queue.SimpleQueue` (implemented
270-
in C) with its Python counterpart, but the details may change at any time.
268+
This replaces ``SimpleQueue``, ``PriorityQueue``, ``Queue``
269+
and ``LifoQueue`` with their gevent equivalents.
271270
272271
.. versionadded:: 1.3.5
272+
273+
.. versionchanged:: NEXT
274+
In addition to ``SimpleQueue``, now also patches
275+
``Queue``, ``PriorityQueue`` and ``LifoQueue``.`
273276
"""
274277
_patch_module('queue', items=[
275278
'SimpleQueue',
279+
'PriorityQueue',
280+
'LifoQueue',
281+
'Queue',
276282
])
277283

278284

src/gevent/queue.py

Lines changed: 98 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@
2626
.. versionchanged:: 1.0
2727
``Queue(0)`` now means queue of infinite size, not a channel. A :exc:`DeprecationWarning`
2828
will be issued with this argument.
29+
30+
.. versionchanged:: NEXT
31+
:class:`Queue` was renamed to :class:`SimpleQueue`, while :class:`JoinableQueue` was
32+
renamed to :class:`Queue` (`JoinableQueue` remains a backwards compatible alias).
33+
This adds the ability to ``join()`` all queues, like the standard library.
34+
35+
Previously ``SimpleQueue`` was an alias for the undocumented Python
36+
implementation ``queue._PySimpleQueue``; now it is gevent's own implementation.
37+
This ensures that it is cooperative even without monkey-patching.
2938
"""
3039

3140

@@ -47,16 +56,10 @@
4756
from gevent.exceptions import InvalidSwitchError
4857

4958
__all__ = []
50-
__implements__ = ['Queue', 'PriorityQueue', 'LifoQueue']
59+
__implements__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
5160
__extensions__ = ['JoinableQueue', 'Channel']
5261
__imports__ = ['Empty', 'Full']
5362

54-
55-
__all__.append('SimpleQueue')
56-
# SimpleQueue is implemented in C and directly allocates locks
57-
# unaffected by monkey patching. We need the Python version.
58-
SimpleQueue = __queue__._PySimpleQueue # pylint:disable=no-member
59-
6063
if hasattr(__queue__, 'ShutDown'): # New in 3.13
6164
ShutDown = __queue__.ShutDown
6265
__imports__.append('ShutDown')
@@ -105,7 +108,7 @@ def put_and_switch(self):
105108
self.item = None
106109
return self.switch(self)
107110

108-
class Queue(object):
111+
class SimpleQueue(object):
109112
"""
110113
Create a queue object with a given maximum size.
111114
@@ -126,6 +129,10 @@ class Queue(object):
126129
previously anyway, but that wasn't the case for PyPy.
127130
.. versionchanged:: 24.10.1
128131
Implement the ``shutdown`` methods from Python 3.13.
132+
.. versionchanged:: NEXT
133+
Renamed from ``Queue`` to ``SimpleQueue`` to better match the standard library.
134+
While this class no longer has a ``shutdown`` method, the new ``Queue`` class
135+
(previously ``JoinableQueue``) continues to have it.
129136
"""
130137

131138
__slots__ = (
@@ -449,81 +456,15 @@ def __next__(self):
449456
raise result
450457
return result
451458

452-
def shutdown(self, immediate=False):
453-
"""
454-
"Shut-down the queue, making queue gets and puts raise
455-
`ShutDown`.
456-
457-
By default, gets will only raise once the queue is empty. Set
458-
*immediate* to True to make gets raise immediately instead.
459-
460-
All blocked callers of `put` and `get` will be unblocked.
461-
462-
In joinable queues, if *immediate*, a task is marked as done
463-
for each item remaining in the queue, which may unblock
464-
callers of `join`.
465-
"""
466-
self.is_shutdown = True
467-
if immediate:
468-
self._drain_for_immediate_shutdown()
469-
getters = list(self.getters)
470-
putters = list(self.putters)
471-
self.getters.clear()
472-
self.putters.clear()
473-
for waiter in getters + putters:
474-
self.hub.loop.run_callback(waiter.throw, ShutDown)
475-
476-
def _drain_for_immediate_shutdown(self):
477-
while self.qsize():
478-
self.get()
479-
480-
class UnboundQueue(Queue):
481-
# A specialization of Queue that knows it can never
482-
# be bound. Changing its maxsize has no effect.
483-
484-
__slots__ = ()
485-
486-
def __init__(self, maxsize=None, items=()):
487-
if maxsize is not None:
488-
raise ValueError("UnboundQueue has no maxsize")
489-
Queue.__init__(self, maxsize, items)
490-
self.putters = None # Will never be used.
491-
492-
def put(self, item, block=True, timeout=None):
493-
self._put(item)
494-
if self.getters:
495-
self._schedule_unlock()
496-
497-
498-
class PriorityQueue(Queue):
499-
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
500-
501-
Entries are typically tuples of the form: ``(priority number, data)``.
502459

503-
.. versionchanged:: 1.2a1
504-
Any *items* given to the constructor will now be passed through
505-
:func:`heapq.heapify` to ensure the invariants of this class hold.
506-
Previously it was just assumed that they were already a heap.
507-
'''
508-
509-
__slots__ = ()
510-
511-
def _create_queue(self, items=()):
512-
q = list(items)
513-
_heapify(q)
514-
return q
515-
516-
def _put(self, item):
517-
_heappush(self.queue, item)
518-
519-
def _get(self):
520-
return _heappop(self.queue)
521-
522-
523-
class JoinableQueue(Queue):
460+
class Queue(SimpleQueue):
524461
"""
525-
A subclass of :class:`Queue` that additionally has
462+
A subclass of :class:`SimpleQueue` that additionally has
526463
:meth:`task_done` and :meth:`join` methods.
464+
465+
.. versionchanged:: NEXT
466+
Renamed from ``JoinablQueue`` to simply ``Queue`` to better
467+
match the capability of the standard library :class:`queue.Queue`.
527468
"""
528469

529470
__slots__ = (
@@ -539,7 +480,7 @@ def __init__(self, maxsize=None, items=(), unfinished_tasks=None):
539480
(if any) will be considered unfinished.
540481
541482
"""
542-
Queue.__init__(self, maxsize, items, _warn_depth=3)
483+
SimpleQueue.__init__(self, maxsize, items, _warn_depth=3)
543484

544485
from gevent.event import Event
545486
self._cond = Event()
@@ -559,13 +500,13 @@ def copy(self):
559500
return type(self)(self.maxsize, self.queue, self.unfinished_tasks)
560501

561502
def _format(self):
562-
result = Queue._format(self)
503+
result = SimpleQueue._format(self)
563504
if self.unfinished_tasks:
564505
result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
565506
return result
566507

567508
def _put(self, item):
568-
Queue._put(self, item)
509+
SimpleQueue._put(self, item)
569510
self._did_put_task()
570511

571512
def _did_put_task(self):
@@ -574,8 +515,8 @@ def _did_put_task(self):
574515

575516
def task_done(self):
576517
'''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
577-
For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
578-
that the processing on the task is complete.
518+
For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
519+
:meth:`task_done` tells the queue that the processing on the task is complete.
579520
580521
If a :meth:`join` is currently blocking, it will resume when all items have been processed
581522
(meaning that a :meth:`task_done` call was received for every item that had been
@@ -608,13 +549,84 @@ def join(self, timeout=None):
608549
'''
609550
return self._cond.wait(timeout=timeout)
610551

552+
def shutdown(self, immediate=False):
553+
"""
554+
"Shut-down the queue, making queue gets and puts raise
555+
`ShutDown`.
556+
557+
By default, gets will only raise once the queue is empty. Set
558+
*immediate* to True to make gets raise immediately instead.
559+
560+
All blocked callers of `put` and `get` will be unblocked.
561+
562+
In joinable queues, if *immediate*, a task is marked as done
563+
for each item remaining in the queue, which may unblock
564+
callers of `join`.
565+
"""
566+
self.is_shutdown = True
567+
if immediate:
568+
self._drain_for_immediate_shutdown()
569+
getters = list(self.getters)
570+
putters = list(self.putters)
571+
self.getters.clear()
572+
self.putters.clear()
573+
for waiter in getters + putters:
574+
self.hub.loop.run_callback(waiter.throw, ShutDown)
575+
611576
def _drain_for_immediate_shutdown(self):
612577
while self.qsize():
613578
self.get()
614579
self.task_done()
615580

581+
# .. versionchanged:: NEXT
582+
# Now a BWC alias
583+
JoinableQueue = Queue
584+
585+
class UnboundQueue(Queue):
586+
# A specialization of Queue that knows it can never
587+
# be bound. Changing its maxsize has no effect.
588+
589+
__slots__ = ()
590+
591+
def __init__(self, maxsize=None, items=()):
592+
if maxsize is not None:
593+
raise ValueError("UnboundQueue has no maxsize")
594+
Queue.__init__(self, maxsize, items)
595+
self.putters = None # Will never be used.
596+
597+
def put(self, item, block=True, timeout=None):
598+
self._put(item)
599+
if self.getters:
600+
self._schedule_unlock()
601+
602+
603+
class PriorityQueue(Queue):
604+
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
605+
606+
Entries are typically tuples of the form: ``(priority number, data)``.
607+
608+
.. versionchanged:: 1.2a1
609+
Any *items* given to the constructor will now be passed through
610+
:func:`heapq.heapify` to ensure the invariants of this class hold.
611+
Previously it was just assumed that they were already a heap.
612+
'''
613+
614+
__slots__ = ()
615+
616+
def _create_queue(self, items=()):
617+
q = list(items)
618+
_heapify(q)
619+
return q
620+
621+
def _put(self, item):
622+
_heappush(self.queue, item)
623+
self._did_put_task()
624+
625+
def _get(self):
626+
return _heappop(self.queue)
627+
616628

617-
class LifoQueue(JoinableQueue):
629+
class LifoQueue(Queue):
618630
"""
619631
A subclass of :class:`JoinableQueue` that retrieves most recently added entries first.
620632

src/gevent/tests/test__monkey.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,14 @@ def veto(event):
152152
and e.module_name == 'ssl')
153153

154154
def test_patch_queue(self):
155-
try:
156-
import queue
157-
except ImportError:
158-
# Python 2 called this Queue. Note that having
159-
# python-future installed gives us a queue module on
160-
# Python 2 as well.
161-
queue = None
162-
if not hasattr(queue, 'SimpleQueue'):
163-
raise unittest.SkipTest("Needs SimpleQueue")
155+
import queue
156+
import gevent.queue as gq
157+
164158
# pylint:disable=no-member
165-
self.assertIs(queue.SimpleQueue, queue._PySimpleQueue)
159+
self.assertIs(queue.SimpleQueue, gq.SimpleQueue)
160+
self.assertIs(queue.LifoQueue, gq.LifoQueue)
161+
self.assertIs(queue.Queue, gq.Queue)
162+
self.assertIs(queue.PriorityQueue, gq.PriorityQueue)
166163

167164
if __name__ == '__main__':
168165
unittest.main()

0 commit comments

Comments
 (0)