2626.. versionchanged:: 1.0
2727 ``Queue(0)`` now means queue of infinite size, not a channel. A :exc:`DeprecationWarning`
2828 will be issued with this argument.
29+
30+ .. versionchanged:: NEXT
31+ :class:`Queue` was renamed to :class:`SimpleQueue`, while :class:`JoinableQueue` was
32+ renamed to :class:`Queue` (`JoinableQueue` remains a backwards compatible alias).
33+ This adds the ability to ``join()`` all queues, like the standard library.
34+
35+ Previously ``SimpleQueue`` was an alias for the undocumented Python
36+ implementation ``queue._PySimpleQueue``; now it is gevent's own implementation.
37+ This ensures that it is cooperative even without monkey-patching.
2938"""
3039
3140
4756from gevent .exceptions import InvalidSwitchError
4857
4958__all__ = []
50- __implements__ = ['Queue' , 'PriorityQueue' , 'LifoQueue' ]
59+ __implements__ = ['Queue' , 'PriorityQueue' , 'LifoQueue' , 'SimpleQueue' ]
5160__extensions__ = ['JoinableQueue' , 'Channel' ]
5261__imports__ = ['Empty' , 'Full' ]
5362
54-
55- __all__ .append ('SimpleQueue' )
56- # SimpleQueue is implemented in C and directly allocates locks
57- # unaffected by monkey patching. We need the Python version.
58- SimpleQueue = __queue__ ._PySimpleQueue # pylint:disable=no-member
59-
6063if hasattr (__queue__ , 'ShutDown' ): # New in 3.13
6164 ShutDown = __queue__ .ShutDown
6265 __imports__ .append ('ShutDown' )
@@ -105,7 +108,7 @@ def put_and_switch(self):
105108 self .item = None
106109 return self .switch (self )
107110
108- class Queue (object ):
111+ class SimpleQueue (object ):
109112 """
110113 Create a queue object with a given maximum size.
111114
@@ -126,6 +129,10 @@ class Queue(object):
126129 previously anyway, but that wasn't the case for PyPy.
127130 .. versionchanged:: 24.10.1
128131 Implement the ``shutdown`` methods from Python 3.13.
132+ .. versionchanged:: NEXT
133+ Renamed from ``Queue`` to ``SimpleQueue`` to better match the standard library.
134+ While this class no longer has a ``shutdown`` method, the new ``Queue`` class
135+ (previously ``JoinableQueue``) continues to have it.
129136 """
130137
131138 __slots__ = (
@@ -449,81 +456,15 @@ def __next__(self):
449456 raise result
450457 return result
451458
452- def shutdown (self , immediate = False ):
453- """
454- "Shut-down the queue, making queue gets and puts raise
455- `ShutDown`.
456-
457- By default, gets will only raise once the queue is empty. Set
458- *immediate* to True to make gets raise immediately instead.
459-
460- All blocked callers of `put` and `get` will be unblocked.
461-
462- In joinable queues, if *immediate*, a task is marked as done
463- for each item remaining in the queue, which may unblock
464- callers of `join`.
465- """
466- self .is_shutdown = True
467- if immediate :
468- self ._drain_for_immediate_shutdown ()
469- getters = list (self .getters )
470- putters = list (self .putters )
471- self .getters .clear ()
472- self .putters .clear ()
473- for waiter in getters + putters :
474- self .hub .loop .run_callback (waiter .throw , ShutDown )
475-
476- def _drain_for_immediate_shutdown (self ):
477- while self .qsize ():
478- self .get ()
479-
480- class UnboundQueue (Queue ):
481- # A specialization of Queue that knows it can never
482- # be bound. Changing its maxsize has no effect.
483-
484- __slots__ = ()
485-
486- def __init__ (self , maxsize = None , items = ()):
487- if maxsize is not None :
488- raise ValueError ("UnboundQueue has no maxsize" )
489- Queue .__init__ (self , maxsize , items )
490- self .putters = None # Will never be used.
491-
492- def put (self , item , block = True , timeout = None ):
493- self ._put (item )
494- if self .getters :
495- self ._schedule_unlock ()
496-
497-
498- class PriorityQueue (Queue ):
499- '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
500-
501- Entries are typically tuples of the form: ``(priority number, data)``.
502459
503- .. versionchanged:: 1.2a1
504- Any *items* given to the constructor will now be passed through
505- :func:`heapq.heapify` to ensure the invariants of this class hold.
506- Previously it was just assumed that they were already a heap.
507- '''
508-
509- __slots__ = ()
510-
511- def _create_queue (self , items = ()):
512- q = list (items )
513- _heapify (q )
514- return q
515-
516- def _put (self , item ):
517- _heappush (self .queue , item )
518-
519- def _get (self ):
520- return _heappop (self .queue )
521-
522-
523- class JoinableQueue (Queue ):
460+ class Queue (SimpleQueue ):
524461 """
525- A subclass of :class:`Queue ` that additionally has
462+ A subclass of :class:`SimpleQueue ` that additionally has
526463 :meth:`task_done` and :meth:`join` methods.
464+
465+ .. versionchanged:: NEXT
466+ Renamed from ``JoinablQueue`` to simply ``Queue`` to better
467+ match the capability of the standard library :class:`queue.Queue`.
527468 """
528469
529470 __slots__ = (
@@ -539,7 +480,7 @@ def __init__(self, maxsize=None, items=(), unfinished_tasks=None):
539480 (if any) will be considered unfinished.
540481
541482 """
542- Queue .__init__ (self , maxsize , items , _warn_depth = 3 )
483+ SimpleQueue .__init__ (self , maxsize , items , _warn_depth = 3 )
543484
544485 from gevent .event import Event
545486 self ._cond = Event ()
@@ -559,13 +500,13 @@ def copy(self):
559500 return type (self )(self .maxsize , self .queue , self .unfinished_tasks )
560501
561502 def _format (self ):
562- result = Queue ._format (self )
503+ result = SimpleQueue ._format (self )
563504 if self .unfinished_tasks :
564505 result += ' tasks=%s _cond=%s' % (self .unfinished_tasks , self ._cond )
565506 return result
566507
567508 def _put (self , item ):
568- Queue ._put (self , item )
509+ SimpleQueue ._put (self , item )
569510 self ._did_put_task ()
570511
571512 def _did_put_task (self ):
@@ -574,8 +515,8 @@ def _did_put_task(self):
574515
575516 def task_done (self ):
576517 '''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
577- For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
578- that the processing on the task is complete.
518+ For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to
519+ :meth:`task_done` tells the queue that the processing on the task is complete.
579520
580521 If a :meth:`join` is currently blocking, it will resume when all items have been processed
581522 (meaning that a :meth:`task_done` call was received for every item that had been
@@ -608,13 +549,84 @@ def join(self, timeout=None):
608549 '''
609550 return self ._cond .wait (timeout = timeout )
610551
552+ def shutdown (self , immediate = False ):
553+ """
554+ "Shut-down the queue, making queue gets and puts raise
555+ `ShutDown`.
556+
557+ By default, gets will only raise once the queue is empty. Set
558+ *immediate* to True to make gets raise immediately instead.
559+
560+ All blocked callers of `put` and `get` will be unblocked.
561+
562+ In joinable queues, if *immediate*, a task is marked as done
563+ for each item remaining in the queue, which may unblock
564+ callers of `join`.
565+ """
566+ self .is_shutdown = True
567+ if immediate :
568+ self ._drain_for_immediate_shutdown ()
569+ getters = list (self .getters )
570+ putters = list (self .putters )
571+ self .getters .clear ()
572+ self .putters .clear ()
573+ for waiter in getters + putters :
574+ self .hub .loop .run_callback (waiter .throw , ShutDown )
575+
611576 def _drain_for_immediate_shutdown (self ):
612577 while self .qsize ():
613578 self .get ()
614579 self .task_done ()
615580
581+ # .. versionchanged:: NEXT
582+ # Now a BWC alias
583+ JoinableQueue = Queue
584+
585+ class UnboundQueue (Queue ):
586+ # A specialization of Queue that knows it can never
587+ # be bound. Changing its maxsize has no effect.
588+
589+ __slots__ = ()
590+
591+ def __init__ (self , maxsize = None , items = ()):
592+ if maxsize is not None :
593+ raise ValueError ("UnboundQueue has no maxsize" )
594+ Queue .__init__ (self , maxsize , items )
595+ self .putters = None # Will never be used.
596+
597+ def put (self , item , block = True , timeout = None ):
598+ self ._put (item )
599+ if self .getters :
600+ self ._schedule_unlock ()
601+
602+
603+ class PriorityQueue (Queue ):
604+ '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
605+
606+ Entries are typically tuples of the form: ``(priority number, data)``.
607+
608+ .. versionchanged:: 1.2a1
609+ Any *items* given to the constructor will now be passed through
610+ :func:`heapq.heapify` to ensure the invariants of this class hold.
611+ Previously it was just assumed that they were already a heap.
612+ '''
613+
614+ __slots__ = ()
615+
616+ def _create_queue (self , items = ()):
617+ q = list (items )
618+ _heapify (q )
619+ return q
620+
621+ def _put (self , item ):
622+ _heappush (self .queue , item )
623+ self ._did_put_task ()
624+
625+ def _get (self ):
626+ return _heappop (self .queue )
627+
616628
617- class LifoQueue (JoinableQueue ):
629+ class LifoQueue (Queue ):
618630 """
619631 A subclass of :class:`JoinableQueue` that retrieves most recently added entries first.
620632
0 commit comments