Skip to content

Commit 1846483

Browse files
authored
sync: expose strong and weak counts of mpsc sender handles (#6405)
1 parent baad270 commit 1846483

File tree

4 files changed

+228
-0
lines changed

4 files changed

+228
-0
lines changed

tokio/src/sync/mpsc/bounded.rs

+28
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,16 @@ impl<T> Sender<T> {
14091409
pub fn max_capacity(&self) -> usize {
14101410
self.chan.semaphore().bound
14111411
}
1412+
1413+
/// Returns the number of [`Sender`] handles.
1414+
pub fn strong_count(&self) -> usize {
1415+
self.chan.strong_count()
1416+
}
1417+
1418+
/// Returns the number of [`WeakSender`] handles.
1419+
pub fn weak_count(&self) -> usize {
1420+
self.chan.weak_count()
1421+
}
14121422
}
14131423

14141424
impl<T> Clone for Sender<T> {
@@ -1429,19 +1439,37 @@ impl<T> fmt::Debug for Sender<T> {
14291439

14301440
impl<T> Clone for WeakSender<T> {
14311441
fn clone(&self) -> Self {
1442+
self.chan.increment_weak_count();
1443+
14321444
WeakSender {
14331445
chan: self.chan.clone(),
14341446
}
14351447
}
14361448
}
14371449

1450+
impl<T> Drop for WeakSender<T> {
1451+
fn drop(&mut self) {
1452+
self.chan.decrement_weak_count();
1453+
}
1454+
}
1455+
14381456
impl<T> WeakSender<T> {
14391457
/// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
14401458
/// if there are other `Sender` instances alive and the channel wasn't
14411459
/// previously dropped, otherwise `None` is returned.
14421460
pub fn upgrade(&self) -> Option<Sender<T>> {
14431461
chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
14441462
}
1463+
1464+
/// Returns the number of [`Sender`] handles.
1465+
pub fn strong_count(&self) -> usize {
1466+
self.chan.strong_count()
1467+
}
1468+
1469+
/// Returns the number of [`WeakSender`] handles.
1470+
pub fn weak_count(&self) -> usize {
1471+
self.chan.weak_count()
1472+
}
14451473
}
14461474

14471475
impl<T> fmt::Debug for WeakSender<T> {

tokio/src/sync/mpsc/chan.rs

+30
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ pub(super) struct Chan<T, S> {
6666
/// When this drops to zero, the send half of the channel is closed.
6767
tx_count: AtomicUsize,
6868

69+
/// Tracks the number of outstanding weak sender handles.
70+
tx_weak_count: AtomicUsize,
71+
6972
/// Only accessed by `Rx` handle.
7073
rx_fields: UnsafeCell<RxFields<T>>,
7174
}
@@ -115,6 +118,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
115118
semaphore,
116119
rx_waker: CachePadded::new(AtomicWaker::new()),
117120
tx_count: AtomicUsize::new(1),
121+
tx_weak_count: AtomicUsize::new(0),
118122
rx_fields: UnsafeCell::new(RxFields {
119123
list: rx,
120124
rx_closed: false,
@@ -131,7 +135,17 @@ impl<T, S> Tx<T, S> {
131135
Tx { inner: chan }
132136
}
133137

138+
pub(super) fn strong_count(&self) -> usize {
139+
self.inner.tx_count.load(Acquire)
140+
}
141+
142+
pub(super) fn weak_count(&self) -> usize {
143+
self.inner.tx_weak_count.load(Relaxed)
144+
}
145+
134146
pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
147+
self.inner.increment_weak_count();
148+
135149
self.inner.clone()
136150
}
137151

@@ -452,6 +466,22 @@ impl<T, S> Chan<T, S> {
452466
// Notify the rx task
453467
self.rx_waker.wake();
454468
}
469+
470+
pub(super) fn decrement_weak_count(&self) {
471+
self.tx_weak_count.fetch_sub(1, Relaxed);
472+
}
473+
474+
pub(super) fn increment_weak_count(&self) {
475+
self.tx_weak_count.fetch_add(1, Relaxed);
476+
}
477+
478+
pub(super) fn strong_count(&self) -> usize {
479+
self.tx_count.load(Acquire)
480+
}
481+
482+
pub(super) fn weak_count(&self) -> usize {
483+
self.tx_weak_count.load(Relaxed)
484+
}
455485
}
456486

457487
impl<T, S> Drop for Chan<T, S> {

tokio/src/sync/mpsc/unbounded.rs

+28
Original file line numberDiff line numberDiff line change
@@ -578,23 +578,51 @@ impl<T> UnboundedSender<T> {
578578
chan: self.chan.downgrade(),
579579
}
580580
}
581+
582+
/// Returns the number of [`UnboundedSender`] handles.
583+
pub fn strong_count(&self) -> usize {
584+
self.chan.strong_count()
585+
}
586+
587+
/// Returns the number of [`WeakUnboundedSender`] handles.
588+
pub fn weak_count(&self) -> usize {
589+
self.chan.weak_count()
590+
}
581591
}
582592

583593
impl<T> Clone for WeakUnboundedSender<T> {
584594
fn clone(&self) -> Self {
595+
self.chan.increment_weak_count();
596+
585597
WeakUnboundedSender {
586598
chan: self.chan.clone(),
587599
}
588600
}
589601
}
590602

603+
impl<T> Drop for WeakUnboundedSender<T> {
604+
fn drop(&mut self) {
605+
self.chan.decrement_weak_count();
606+
}
607+
}
608+
591609
impl<T> WeakUnboundedSender<T> {
592610
/// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
593611
/// This will return `Some` if there are other `Sender` instances alive and
594612
/// the channel wasn't previously dropped, otherwise `None` is returned.
595613
pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
596614
chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
597615
}
616+
617+
/// Returns the number of [`UnboundedSender`] handles.
618+
pub fn strong_count(&self) -> usize {
619+
self.chan.strong_count()
620+
}
621+
622+
/// Returns the number of [`WeakUnboundedSender`] handles.
623+
pub fn weak_count(&self) -> usize {
624+
self.chan.weak_count()
625+
}
598626
}
599627

600628
impl<T> fmt::Debug for WeakUnboundedSender<T> {

tokio/tests/sync_mpsc_weak.rs

+142
Original file line numberDiff line numberDiff line change
@@ -511,3 +511,145 @@ fn test_tx_count_weak_unbounded_sender() {
511511

512512
assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
513513
}
514+
515+
#[tokio::test]
516+
async fn sender_strong_count_when_cloned() {
517+
let (tx, _rx) = mpsc::channel::<()>(1);
518+
519+
let tx2 = tx.clone();
520+
521+
assert_eq!(tx.strong_count(), 2);
522+
assert_eq!(tx2.strong_count(), 2);
523+
}
524+
525+
#[tokio::test]
526+
async fn sender_weak_count_when_downgraded() {
527+
let (tx, _rx) = mpsc::channel::<()>(1);
528+
529+
let weak = tx.downgrade();
530+
531+
assert_eq!(tx.weak_count(), 1);
532+
assert_eq!(weak.weak_count(), 1);
533+
}
534+
535+
#[tokio::test]
536+
async fn sender_strong_count_when_dropped() {
537+
let (tx, _rx) = mpsc::channel::<()>(1);
538+
539+
let tx2 = tx.clone();
540+
541+
drop(tx2);
542+
543+
assert_eq!(tx.strong_count(), 1);
544+
}
545+
546+
#[tokio::test]
547+
async fn sender_weak_count_when_dropped() {
548+
let (tx, _rx) = mpsc::channel::<()>(1);
549+
550+
let weak = tx.downgrade();
551+
552+
drop(weak);
553+
554+
assert_eq!(tx.weak_count(), 0);
555+
}
556+
557+
#[tokio::test]
558+
async fn sender_strong_and_weak_conut() {
559+
let (tx, _rx) = mpsc::channel::<()>(1);
560+
561+
let tx2 = tx.clone();
562+
563+
let weak = tx.downgrade();
564+
let weak2 = tx2.downgrade();
565+
566+
assert_eq!(tx.strong_count(), 2);
567+
assert_eq!(tx2.strong_count(), 2);
568+
assert_eq!(weak.strong_count(), 2);
569+
assert_eq!(weak2.strong_count(), 2);
570+
571+
assert_eq!(tx.weak_count(), 2);
572+
assert_eq!(tx2.weak_count(), 2);
573+
assert_eq!(weak.weak_count(), 2);
574+
assert_eq!(weak2.weak_count(), 2);
575+
576+
drop(tx2);
577+
drop(weak2);
578+
579+
assert_eq!(tx.strong_count(), 1);
580+
assert_eq!(weak.strong_count(), 1);
581+
582+
assert_eq!(tx.weak_count(), 1);
583+
assert_eq!(weak.weak_count(), 1);
584+
}
585+
586+
#[tokio::test]
587+
async fn unbounded_sender_strong_count_when_cloned() {
588+
let (tx, _rx) = mpsc::unbounded_channel::<()>();
589+
590+
let tx2 = tx.clone();
591+
592+
assert_eq!(tx.strong_count(), 2);
593+
assert_eq!(tx2.strong_count(), 2);
594+
}
595+
596+
#[tokio::test]
597+
async fn unbounded_sender_weak_count_when_downgraded() {
598+
let (tx, _rx) = mpsc::unbounded_channel::<()>();
599+
600+
let weak = tx.downgrade();
601+
602+
assert_eq!(tx.weak_count(), 1);
603+
assert_eq!(weak.weak_count(), 1);
604+
}
605+
606+
#[tokio::test]
607+
async fn unbounded_sender_strong_count_when_dropped() {
608+
let (tx, _rx) = mpsc::unbounded_channel::<()>();
609+
610+
let tx2 = tx.clone();
611+
612+
drop(tx2);
613+
614+
assert_eq!(tx.strong_count(), 1);
615+
}
616+
617+
#[tokio::test]
618+
async fn unbounded_sender_weak_count_when_dropped() {
619+
let (tx, _rx) = mpsc::unbounded_channel::<()>();
620+
621+
let weak = tx.downgrade();
622+
623+
drop(weak);
624+
625+
assert_eq!(tx.weak_count(), 0);
626+
}
627+
628+
#[tokio::test]
629+
async fn unbounded_sender_strong_and_weak_conut() {
630+
let (tx, _rx) = mpsc::unbounded_channel::<()>();
631+
632+
let tx2 = tx.clone();
633+
634+
let weak = tx.downgrade();
635+
let weak2 = tx2.downgrade();
636+
637+
assert_eq!(tx.strong_count(), 2);
638+
assert_eq!(tx2.strong_count(), 2);
639+
assert_eq!(weak.strong_count(), 2);
640+
assert_eq!(weak2.strong_count(), 2);
641+
642+
assert_eq!(tx.weak_count(), 2);
643+
assert_eq!(tx2.weak_count(), 2);
644+
assert_eq!(weak.weak_count(), 2);
645+
assert_eq!(weak2.weak_count(), 2);
646+
647+
drop(tx2);
648+
drop(weak2);
649+
650+
assert_eq!(tx.strong_count(), 1);
651+
assert_eq!(weak.strong_count(), 1);
652+
653+
assert_eq!(tx.weak_count(), 1);
654+
assert_eq!(weak.weak_count(), 1);
655+
}

0 commit comments

Comments
 (0)