Skip to content

Use a ConcurrentHashMap for Supervisor state if F is Async #2876

Merged
djspiewak merged 1 commit intotypelevel:series/3.3.xfrom
wjoel:dispatch-supervisor-per-cpu
Mar 20, 2022
Merged

Use a ConcurrentHashMap for Supervisor state if F is Async #2876
djspiewak merged 1 commit intotypelevel:series/3.3.xfrom
wjoel:dispatch-supervisor-per-cpu

Conversation

@wjoel
Copy link
Copy Markdown
Contributor

@wjoel wjoel commented Mar 14, 2022

Replaces #2875 (targeted for 3.3.x)

In programs that use the same dispatcher a lot, the stateRef in Supervisor ends up being a hotspot, and a significant amount of time is spent in spin attempting to update it.

While the documentation says that dispatchers are cheap and should be constructed often, web servers like blaze-server, jetty-server, http4s-netty, and fs2-netty, use one Dispatcher per server.
I don't think that's wrong, and I've also experienced problems with "dispatcher already shutdown" when trying to use shorter-lived ones, as reported in #2727

Given that, we can fix the hotspot in Dispatcher by creating one Supervisor for each CPU/worker, which is what this PR does.

Results (from a Ryzen 5900X, I expect the improvement to scale with the number of cores) look good:

Baseline on series/3.x:

DispatcherBenchmark.scheduling    1000  thrpt   20  167.852 ± 1.801  ops/min

This PR:

DispatcherBenchmark.scheduling    1000  thrpt   20  600.486 ± 6.002  ops/min

I'm also doing some work with Techempower HTTP benchmarks, and this change results in a 5% increase in performance on plaintext using blaze (but interestingly, gets the best result on 16384 concurrent requests while it otherwise peaks at 4096 concurrent requests), and an even greater increase when using a modified http4s-netty backend.

It's possible that there's a better fix for the underlying issue, but this one is simple, and a 3x improvement in (benchmark) performance makes it worth considering, in my opinion. Even if it does making constructing a Dispatcher a tiny bit less cheap.

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 15, 2022

Using a ConcurrentHashMap instead (I hope it doesn't cause issues for Scala.js?) makes it even faster.

One Supervisor per worker, but with ConcurrentHashMap for state (currently this PR):

Benchmark                       (size)   Mode  Cnt    Score    Error    Units
DispatcherBenchmark.scheduling    1000  thrpt   20  715.391 ± 28.495  ops/min

One Supervisor only (old Dispatcher.scala), but with ConcurrentHashMap for state:

Benchmark                       (size)   Mode  Cnt    Score   Error    Units
DispatcherBenchmark.scheduling    1000  thrpt   20  709.094 ± 8.640  ops/min

So we get most of the benefits from just changing to ConcurrentHashMap, but one Supervisor per CPU/worker still helps a bit, it seems.

@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 6c7b6ea to eda176a Compare March 15, 2022 16:55
@djspiewak
Copy link
Copy Markdown
Member

The problem is that Supervisor can't use ConcurrentHashMap because it's fully pure. We could theoretically figure out whether the underlying F is secretly an Async and then have a different implementation (similar to what we're doing with Queue), but I'm not sure the performance gain (3% ish?) is worth that much effort.

Also Ref[ConcurrentHashMap] seems like a scary type. :-) You really want the contents of a Ref to be immutable.

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 15, 2022

@djspiewak Yeah, sure, I was actually inspired to try ConcurrentHashMap from reading your gist at https://gist.github.com/djspiewak/464c11307cabc80171c90397d4ec34ef last night, figuring this might be below the line, but maybe not. ;)

I did notice it's not used anywhere in cats or cats-effect today, and it's interesting to see that using ConcurrentHashMap improves the performance by another 20%. I can remove that commit, but I might still need help with the test failure (I'll try figuring it out tonight, but I don't have much time).

@djspiewak
Copy link
Copy Markdown
Member

Yeah I'm working on the test failure right now.

My thought on the ConcurrentHashMap is let's jump that shark after we land this first one, and when we do we should leverage the Async typecase trick.

@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from eda176a to 18b06eb Compare March 15, 2022 19:14
@djspiewak
Copy link
Copy Markdown
Member

So the test failure is legitimate. The test in question is checking to make sure that finalizers in separate actions passed to the same Dispatcher are independent and invoked concurrently. This is true today with the existing singular Supervisor, but we lose this property between multiple Supervisors (who will all finalize sequentially). It's not entirely clear to me yet how to resolve this issue.

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 15, 2022

We'd need something like a more general version of Resource.both, then? I took a quick stab at it.

@djspiewak
Copy link
Copy Markdown
Member

We'd need something like a more general version of Resource.both, then?

Resource.both does indeed run finalizers in parallel. I'm basically trying to make up my mind whether or not that's a thing we should be leaning on here. :-P

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 16, 2022

Not sure why some of those CI builds failed, but the last commit passes the tests on my machine (for both JVM and JS). Thanks for figuring out why it failed, then the fix was easy. Let me know if you think it's the right one. ;)

@djspiewak
Copy link
Copy Markdown
Member

The thing with this solution is it basically replicates what Supervisor is already doing. :-P Like, it does work, but I'm not sure it's the best way of doing this. Need to think about it a bit more today…

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 16, 2022

Yeah, I'm not convinced either. The real issue is just the lock contention for that one shared Ref[Map], and there's more than one way to reduce/remove that, of course.

I see there's a StripedHashtable (backed by 4 ThreadSafeHashtable per CPU core) used only to store fiber error callbacks - I wonder if it would be reasonable to do the same (or almost the same) thing for fiber cancellation callbacks in Supervisor. Looks like it might have been created to solve a very similar problem. 🤔

Tangentially, even if it doesn't end up getting used here, something like Resource.all to do this type of concurrent acquire/release might be useful for other use cases, but maybe there's a reason why it doesn't already exist.

@vasilmkd
Copy link
Copy Markdown
Member

There's no particular reason that Supervisor is not more sophisticated, other than the fact that we were not aware of it being a hotspot, and no one working on optimizations for it. Feel free to try out things.

@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 7c9eb5d to 87cbd64 Compare March 19, 2022 21:30
*/
def apply[F[_]](implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = {
F match {
//case asyncF: Async[F] => applyForAsync(asyncF)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak leaving this commented out if you want to wait with jumping the shark, but I see you're about to do so in #2885 anyway ;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jump the shark! Go for it! :-D

@wjoel
Copy link
Copy Markdown
Contributor Author

wjoel commented Mar 19, 2022

This latest push no longer reflects the title, and I would of course update it if this approach is accepted, but instead allows Dispatcher to use a Supervisor optimized for Async that uses a ConcurrentHashMap for keeping track of fibers.

Performance is still excellent, even better than the original PR:
DispatcherBenchmark.scheduling 1000 thrpt 20 703.589 ± 4.302 ops/min

The implementation of supervisor is shared between the Concurrent and Async cases, but Dispatcher calls the async constructor directly which allows us to do this without typecase tricks, but it would be trivial to enable the optimization also for the general case in Supervisor.

Yes, it does use an impure ConcurrentHashMap (which outperforms TrieMap here, by a significant margin) in the async case, but the interface is still pure. I think.

In my opinion, it is preferable to the "one Supervisor per CPU" approach, as that one requires a custom parallel resource implementation, and the approach here also requires less resources as there's only one shared Supervisor instance.

PS. I was able to get to around 725 ops/min by essentially duplicating the Supervisor logic with ConcurrentHashMap/mutable.Map in Dispatcher itself using a similar pattern as for latches etc., but that certainly does not seem to be worth it compared to this alternative.

@djspiewak
Copy link
Copy Markdown
Member

Ah dang this is cool. I'll look into this more later today! Quick note on the compilation errors though: it's coming from the Scala 2.12 cross-build. I think it would be worth not using the collection shims and instead working against the Java API directly if at all possible.

*/
def apply[F[_]](implicit F: Concurrent[F]): Resource[F, Supervisor[F]] = {
F match {
//case asyncF: Async[F] => applyForAsync(asyncF)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jump the shark! Go for it! :-D

new State[F] {
override def remove(token: Unique.Token): F[Unit] = F.delay(state.remove(token)).void
override def add(token: Unique.Token, cancel: F[Unit]): F[Unit] = F.delay(state.put(token, cancel)).void
override def cancelAll(): F[Unit] = F.defer(state.values().asScala.toList.parUnorderedSequence.void)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the only asScala here, so if we swap this for a manual iteration into a ListBuffer, it should resolve the compatibility issue!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I hope it will pass all builds and tests now.

@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 87cbd64 to 5e55e22 Compare March 20, 2022 11:37
@wjoel wjoel changed the title Give each Dispatcher worker its own Supervisor Use a ConcurrentHashMap for Supervisor stateRef if F is Async Mar 20, 2022
@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 5e55e22 to 7ce3b2d Compare March 20, 2022 11:44
@wjoel wjoel changed the title Use a ConcurrentHashMap for Supervisor stateRef if F is Async Use a ConcurrentHashMap for Supervisor state if F is Async Mar 20, 2022
@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 7ce3b2d to 2e8e632 Compare March 20, 2022 12:08
@wjoel wjoel force-pushed the dispatch-supervisor-per-cpu branch from 2e8e632 to 56d5bcc Compare March 20, 2022 12:14
@wjoel wjoel requested a review from djspiewak March 20, 2022 12:58
@djspiewak djspiewak merged commit 02892e8 into typelevel:series/3.3.x Mar 20, 2022
@djspiewak
Copy link
Copy Markdown
Member

Thanks for tackling this!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants