Blocking mechanism with cached threads#2687
Blocking mechanism with cached threads#2687djspiewak merged 13 commits intotypelevel:series/3.3.xfrom vasilmkd:cached-blocking
Conversation
|
Some more benchmarks ran locally on my machine:
BlockingBenchmark.blockThenCede 10000 thrpt 20 12.032 ± 0.250 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 9337.834 ± 67.816 ops/s
BlockingBenchmark.fine 10000 thrpt 20 12.635 ± 0.055 ops/s
BlockingBenchmark.nested 10000 thrpt 20 12.254 ± 0.356 ops/s
Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 23.029 ± 0.249 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 10085.972 ± 159.529 ops/s
BlockingBenchmark.fine 10000 thrpt 20 1734.625 ± 30.182 ops/s
BlockingBenchmark.nested 10000 thrpt 20 1624.172 ± 90.719 ops/s |
|
This PR does not affect |
|
If anyone wants to take it for a spin, This snapshot works with |
|
After the latest round of changes: Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 6.229 ± 0.299 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 3565.953 ± 105.111 ops/s
BlockingBenchmark.fine 10000 thrpt 20 758.839 ± 19.670 ops/s
BlockingBenchmark.nested 10000 thrpt 20 729.449 ± 12.753 ops/sLocal: Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 21.879 ± 0.202 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 10181.248 ± 18.676 ops/s
BlockingBenchmark.fine 10000 thrpt 20 1795.412 ± 16.884 ops/s
BlockingBenchmark.nested 10000 thrpt 20 1673.573 ± 6.557 ops/s |
|
Latest snapshot: "io.vasilev" %% "cats-effect" % "3.3-33-d14798d"Edit: works with |
|
This is supremely amazing. I'm going to take a closer review over the next few days but I really want to get this out soon! |
djspiewak
left a comment
There was a problem hiding this comment.
This is honestly extremely clever. I'm a little concerned about the index bit, but aside from that it's amazing
| import WorkStealingThreadPoolConstants._ | ||
|
|
||
| // Index assigned by the `WorkStealingThreadPool` for identification purposes. | ||
| private[this] var _index: Int = idx |
There was a problem hiding this comment.
Doesn't this mean that threads can get "lost" in the cachedThreads set? Since the index could change after the thread is inserted, meaning that the comparator would fail to find the old value.
There was a problem hiding this comment.
private[unsafe] val cachedThreads: ConcurrentSkipListSet[WorkerThread] =
new ConcurrentSkipListSet(Comparator.comparingInt[WorkerThread](_.nameIndex))val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet()The threads in the cachedThreads skip list are ordered by this immutable val. The value of the nameIndex does not really matter, it can also be System.identityHashCode. But, we need a stable and unique value for debugging (think inspecting threads in JDK Mission Control or in a debugger) and so I just reused it for this purpose too. The ordering doesn't really matter either. I chose the skip list because it offers a log n remove operation instead of a linear one (the remove is necessary when a worker thread is woken up).
The only thing left to do is maybe rename nameIndex to something else.
index is mutable and only has a meaning inside our WSTP. It is an index for the threads in the core pool (only the N threads where N = Runtime#availableProcessors()) and is used to keep track of their data structures (local queues and synchronization primitives), while nameIndex is just used for keeping track of threads in debuggers (to avoid multiple physical threads sharing the same name and be confusing to users).
|
I only force pushed to rebase the PR, I did not make any changes. |
|
Wait a second, I messed up signing my commits. I will force push again. |
|
Done, sorry for the inconvenience. |
Up to now, every thread that was spawned in order to respond to
scala.concurrent.blockingsimply replaced the blocked worker thread. The worked thread that got blocked died after running the blocking code.This PR adds a mechanism for caching threads, similar to
Executors.newCachedThreadPool, where the threads are kept around for one minute after they become inactive, in case more blocking work arrives. This saves the expensive process of spawning a new thread.For now, the retention period is fixed and non-configurable, but I am open to making it user configurable in
3.4.0.Furthemore, this PR makes a change in the
IOFiberblocking section, where fibers can determine if they are running on the WSTP and instead of making an expensive shift to the blocking executor and another expensive shift back to the compute pool, the blocking actions are wrapped inscala.concurrent.blockingand executed in place, taking advantage of cached worker threads or spawning new ones if necessary.I also added benchmarks and the improvements are pretty sweet.
series/3.3.x:This PR: