-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Deregister stale callbacks in Future.firstCompletedOf (prevents memory leaks)
#10927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I'm not experienced in concurrency code, this really needs reviews and most probably changes. @som-snytt @axel22 @jchapuis @nmarton-da |
3bad4b7 to
e67466f
Compare
| } | ||
| while(i.hasNext && firstCompleteHandler.get != null) // exit early if possible | ||
| i.next().onComplete(firstCompleteHandler) | ||
| deregs += i.next().onCompleteDereg(firstCompleteHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you're using the CountDownLatch here to avoid both a data race and a race condition, and the l.await() call will indeed prevent both of them.
However, l.await() is a blocking call, and it could potentially cause a deadlock if the thread pool runs out of threads. You could mitigate that by using a blocking block, to signal to the thread pool that it should create more threads, but it's not exactly the most efficient solution (and you'd be relying on the ExecutorService to do the right thing).
Btw, I say above both a data race and a race condition because:
-
The moment you've shared
firstCompleteHandlerwith a future withonCompleteDereg, it becomes possible that another thread will concurrently invokefirstCompleteHandler, and consequently invokeforeachonderegs.
At the same time, the remainder of this loop is mutatingderegs.
SinceListBufferis not a thread-safe class, there is no guarantee that theforeachcall will traverse all the entries that are in a concurrently modifiedListBuffer- i.e. without the latch, there would be a data-race here. -
But, in addition to that, even if the thread-safety (i.e. data-race problem) were resolved by using some thread-safe collection, there is no guarantee that this loop will complete by the time some other thread invokes
firstCompleteHandler. That is, without a latch, there would be a race-condition here.
Here is an alternative suggestion that would avoid all three problems (data race, race condition and the blocking).
You could be to store an immutable list (i.e. List) of deregistration handlers into a single atomic reference (deregsRef of type AtomicReference[List[() => Unit]]), and have a protocol in which:
- a non-null value in the ref means that no future completed yet, i.e. no future took the responsibility to do the registrations
- and a null value means that some future (which in fact completed first) took over the responsibility to call the deregistration handlers
Concretely:
- In the
whileloop, each iteration should:
val oldDeregs = deregsRef.get()
if (oldDeregs == null) {
break
}
val newDeregs = (i.next().onCompleteDereg(firstCompleteHandler) :: oldDeregs
if (!deregsRef.compareAndSet(oldDeregs, newDeregs) {
// Update failed, meaning that there is a null in deregsRef
// you could do: assert(deregsRef.get() == null)
// You need to call the last deregistration handler, because it was not added to the list
newDeregs.head.apply()
// Then, break.
break
}
- In the
firstCompletedOf:
val r = getAndSet(null)
if (r ne null) {
while (true) {
val deregs = deregsRef.get()
if (deregsRef.compareAndSet(deregs, null)) {
// Now, it's your responsibility to do the deregistrations.
deregs.foreach(_.apply())
break
}
}
}
In fact, you could simplify this further if you wanted, by keeping a single AtomicReference, but changing its type so that it holds both the promise and the thus-far created deregistration callbacks. I.e., in this line, declare this:
new AtomicReference[(Promise[T], () => Unit)](p) with (Try[T] => Unit) {
And then adjust the code after getAndSet accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Alex :) I was trying to find a solution along the lines you suggest but missed a few ideas. At least I managed to spot the races, that's why I ended up using a CountDownLatch.
Now I think it looks good. It seems we can keep only the list of deregistration functions in the AtomicReference, the promise can be referenced by closure. Would be great if you can take another look.
|
@adamw might you be interested in reviewing this...? and/or do you know someone...? |
adamw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SethTisue asked me to review this, not sure if by mistake or not, but I left a couple of comments, feel free to discard them of course :) The concurrent code in firstCompletedOf looks fine, as for the rest, I didn't work directly with Future/Promise internals before, so it's a bit of an unknown field for me.
|
Not at all by mistake :-) I asked at the core meeting yesterday who could review this and your name came up. Thanks a lot for the review! Especially the concurrency parts. I will look again into the order of callbacks, but IIUC there's no ordering guarantee.
For example, the existing internal method |
|
@lrytz Ah ok, then this is not a problem |
41695ad to
c6f2bd6
Compare
|
@lrytz this is causing community build failures when people do it is possible to provide a default implementation? |
Future.firstCompletedOfFuture.firstCompletedOf (prevents memory leaks)
### What changes were proposed in this pull request? Upgrade to scala 2.13.17 ### Why are the changes needed? To bring the latest bug fixes and improvements like JDK 25 support. Note that Scala community announces two breaking changes due to the bug fixes. > Breaking changes > - Mix in the productPrefix hash statically in case class hashCode > - Improve scala.util.Using suppression order - https://github.com/scala/scala/releases/tag/v2.13.17 - scala/scala#11046 - scala/scala#10937 - scala/scala#10927 - scala/bug#13058 - scala/scala#11023 - scala/bug#13033 - scala/scala#11000 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? local and github builds ### Was this patch authored or co-authored using generative AI tooling? No Closes #52509 from vrozov/SPARK-53585. Authored-by: Vlad Rozov <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Upgrade to scala 2.13.17 ### Why are the changes needed? To bring the latest bug fixes and improvements like JDK 25 support. Note that Scala community announces two breaking changes due to the bug fixes. > Breaking changes > - Mix in the productPrefix hash statically in case class hashCode > - Improve scala.util.Using suppression order - https://github.com/scala/scala/releases/tag/v2.13.17 - scala/scala#11046 - scala/scala#10937 - scala/scala#10927 - scala/bug#13058 - scala/scala#11023 - scala/bug#13033 - scala/scala#11000 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? local and github builds ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52509 from vrozov/SPARK-53585. Authored-by: Vlad Rozov <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Future.firstCompletedOfregisters anonCompletehandler on each argument future. After the first future is completed, all remaining handler become no-ops. Remvoing them from the futures can prevent memory leaks.Fixes scala/bug#13058
This PR was later amended by #10994 to avoid changing the public API of
Future