Skip to content

Conversation

@lrytz
Copy link
Member

@lrytz lrytz commented Nov 15, 2024

Future.firstCompletedOf registers an onComplete handler on each argument future. After the first future is completed, all remaining handler become no-ops. Remvoing them from the futures can prevent memory leaks.

Fixes scala/bug#13058

This PR was later amended by #10994 to avoid changing the public API of Future

@scala-jenkins scala-jenkins added this to the 2.13.16 milestone Nov 15, 2024
@lrytz
Copy link
Member Author

lrytz commented Nov 15, 2024

I'm not experienced in concurrency code, this really needs reviews and most probably changes. @som-snytt @axel22 @jchapuis @nmarton-da

@lrytz lrytz force-pushed the t13058 branch 2 times, most recently from 3bad4b7 to e67466f Compare November 15, 2024 15:48
}
while(i.hasNext && firstCompleteHandler.get != null) // exit early if possible
i.next().onComplete(firstCompleteHandler)
deregs += i.next().onCompleteDereg(firstCompleteHandler)
Copy link
Contributor

@axel22 axel22 Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're using the CountDownLatch here to avoid both a data race and a race condition, and the l.await() call will indeed prevent both of them.

However, l.await() is a blocking call, and it could potentially cause a deadlock if the thread pool runs out of threads. You could mitigate that by using a blocking block, to signal to the thread pool that it should create more threads, but it's not exactly the most efficient solution (and you'd be relying on the ExecutorService to do the right thing).

Btw, I say above both a data race and a race condition because:

  • The moment you've shared firstCompleteHandler with a future with onCompleteDereg, it becomes possible that another thread will concurrently invoke firstCompleteHandler, and consequently invoke foreach on deregs.
    At the same time, the remainder of this loop is mutating deregs.
    Since ListBuffer is not a thread-safe class, there is no guarantee that the foreach call will traverse all the entries that are in a concurrently modified ListBuffer - i.e. without the latch, there would be a data-race here.

  • But, in addition to that, even if the thread-safety (i.e. data-race problem) were resolved by using some thread-safe collection, there is no guarantee that this loop will complete by the time some other thread invokes firstCompleteHandler. That is, without a latch, there would be a race-condition here.

Here is an alternative suggestion that would avoid all three problems (data race, race condition and the blocking).

You could be to store an immutable list (i.e. List) of deregistration handlers into a single atomic reference (deregsRef of type AtomicReference[List[() => Unit]]), and have a protocol in which:

  • a non-null value in the ref means that no future completed yet, i.e. no future took the responsibility to do the registrations
  • and a null value means that some future (which in fact completed first) took over the responsibility to call the deregistration handlers

Concretely:

  1. In the while loop, each iteration should:
val oldDeregs = deregsRef.get()
if (oldDeregs == null) {
  break
}
val newDeregs = (i.next().onCompleteDereg(firstCompleteHandler) :: oldDeregs
if (!deregsRef.compareAndSet(oldDeregs, newDeregs) {
  // Update failed, meaning that there is a null in deregsRef
  // you could do: assert(deregsRef.get() == null)
  
  // You need to call the last deregistration handler, because it was not added to the list
  newDeregs.head.apply()
  // Then, break.
  break
}
  1. In the firstCompletedOf:
val r = getAndSet(null)
if (r ne null) {
  while (true) {
    val deregs = deregsRef.get()
    if (deregsRef.compareAndSet(deregs, null)) {
      // Now, it's your responsibility to do the deregistrations.
      deregs.foreach(_.apply())
      break
    }
  }
}

In fact, you could simplify this further if you wanted, by keeping a single AtomicReference, but changing its type so that it holds both the promise and the thus-far created deregistration callbacks. I.e., in this line, declare this:

new AtomicReference[(Promise[T], () => Unit)](p) with (Try[T] => Unit) {

And then adjust the code after getAndSet accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Alex :) I was trying to find a solution along the lines you suggest but missed a few ideas. At least I managed to spot the races, that's why I ended up using a CountDownLatch.

Now I think it looks good. It seems we can keep only the list of deregistration functions in the AtomicReference, the promise can be referenced by closure. Would be great if you can take another look.

@SethTisue
Copy link
Member

@adamw might you be interested in reviewing this...? and/or do you know someone...?

Copy link

@adamw adamw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SethTisue asked me to review this, not sure if by mistake or not, but I left a couple of comments, feel free to discard them of course :) The concurrent code in firstCompletedOf looks fine, as for the rest, I didn't work directly with Future/Promise internals before, so it's a bit of an unknown field for me.

@lrytz
Copy link
Member Author

lrytz commented Nov 28, 2024

Not at all by mistake :-) I asked at the core meeting yesterday who could review this and your name came up. Thanks a lot for the review! Especially the concurrency parts. I will look again into the order of callbacks, but IIUC there's no ordering guarantee.

https://docs.scala-lang.org/overviews/core/futures.html

callbacks registered on the same future are unordered

For example, the existing internal method concatCallbacks(left: Callbacks[T], right: Callbacks[T]) reverses the callbacks in left (https://github.com/scala/scala/blob/v2.13.15/src/library/scala/concurrent/impl/Promise.scala#L323-L328).

@adamw
Copy link

adamw commented Nov 28, 2024

@lrytz Ah ok, then this is not a problem

@lrytz lrytz force-pushed the t13058 branch 3 times, most recently from 41695ad to c6f2bd6 Compare December 3, 2024 12:36
@lrytz lrytz marked this pull request as ready for review December 3, 2024 12:36
@SethTisue SethTisue modified the milestones: 2.13.16, 2.13.17 Dec 10, 2024
@lrytz lrytz merged commit 099d469 into scala:2.13.x Jan 30, 2025
3 checks passed
@SethTisue SethTisue added library:concurrent Changes to the concurrency support in stdlib release-notes worth highlighting in next release notes labels Feb 1, 2025
@SethTisue
Copy link
Member

@lrytz this is causing community build failures when people do extends Future[T], because then they get e.g.

[scala-commons] [error] Missing implementation for member of trait Future:
[scala-commons] [error]   private[package concurrent] def onCompleteWithUnregister[U](f: scala.util.Try[T] => U)(implicit executor: scala.concurrent.ExecutionContext): () => Unit = ???

it is possible to provide a default implementation?

@lrytz lrytz changed the title Deregister stale callbacks in Future.firstCompletedOf Deregister stale callbacks in Future.firstCompletedOf (prevents memory leaks) Sep 19, 2025
dongjoon-hyun pushed a commit to apache/spark that referenced this pull request Oct 13, 2025
### What changes were proposed in this pull request?
Upgrade to scala 2.13.17

### Why are the changes needed?
To bring the latest bug fixes and improvements like JDK 25 support. Note that Scala community announces two breaking changes due to the bug fixes.

> Breaking changes
> - Mix in the productPrefix hash statically in case class hashCode
> - Improve scala.util.Using suppression order

- https://github.com/scala/scala/releases/tag/v2.13.17
  - scala/scala#11046
  - scala/scala#10937
  - scala/scala#10927
    - scala/bug#13058
  - scala/scala#11023
    - scala/bug#13033
  - scala/scala#11000

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
local and github builds

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #52509 from vrozov/SPARK-53585.

Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?
Upgrade to scala 2.13.17

### Why are the changes needed?
To bring the latest bug fixes and improvements like JDK 25 support. Note that Scala community announces two breaking changes due to the bug fixes.

> Breaking changes
> - Mix in the productPrefix hash statically in case class hashCode
> - Improve scala.util.Using suppression order

- https://github.com/scala/scala/releases/tag/v2.13.17
  - scala/scala#11046
  - scala/scala#10937
  - scala/scala#10927
    - scala/bug#13058
  - scala/scala#11023
    - scala/bug#13033
  - scala/scala#11000

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
local and github builds

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52509 from vrozov/SPARK-53585.

Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

library:concurrent Changes to the concurrency support in stdlib release-notes worth highlighting in next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Future.firstCompletedOf still leaky of memory

7 participants