-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Significant performance improvements to Futures #7663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The head ref may contain hidden characters: "wip-additional-perf-futures-\u221A"
Changes from all commits
a23907f
cd466de
5654813
24a5713
bbda70d
6514830
50c6f94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,10 +12,10 @@ | |
|
|
||
| package scala.concurrent | ||
|
|
||
| import java.util.ArrayDeque | ||
| import java.util.concurrent.Executor | ||
| import scala.annotation.{ switch, tailrec } | ||
| import java.util.Objects | ||
| import scala.util.control.NonFatal | ||
| import scala.annotation.{switch, tailrec} | ||
|
|
||
| /** | ||
| * Marker trait to indicate that a Runnable is Batchable by BatchingExecutors | ||
|
|
@@ -26,6 +26,17 @@ trait Batchable { | |
|
|
||
| private[concurrent] object BatchingExecutorStatics { | ||
| final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0) | ||
|
|
||
| // Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion) | ||
| final val syncPreBatchDepth = 16 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 16 seems like a sweet-spot in terms of tradeoffs, but who knows?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making them configurable would be nice. In the best case, per dispatcher but if that's not possible maybe by JVM property?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I contemplated having it as a parameter to submitSyncBatched, but it would be weird if there are multiple invocations which use different numbers in the same implementation. And putting it as a method on a trait means an invokeinterface extra per call, which is costly. Having it as a JVM-parameter means that it is set for all implementations which may or may not be desirable either. I'm not sure what the best option is in this case, but it is a good question—do you have any preference and why? |
||
|
|
||
| // Max number of Runnables processed in one go (to prevent starvation of other tasks on the pool) | ||
| final val runLimit = 1024 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These limits are up for debate, I'm not sure that there is any optimal single number at all. Could lower to 64 or 128 or whatnot. |
||
|
|
||
| final object MissingParentBlockContext extends BlockContext { | ||
| override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = | ||
| try thunk finally throw new IllegalStateException("BUG in BatchingExecutor.Batch: parentBlockContext is null") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -38,154 +49,219 @@ private[concurrent] object BatchingExecutorStatics { | |
| * thread which may improve CPU affinity. However, | ||
| * if tasks passed to the Executor are blocking | ||
| * or expensive, this optimization can prevent work-stealing | ||
| * and make performance worse. Also, some ExecutionContext | ||
| * may be fast enough natively that this optimization just | ||
| * adds overhead. | ||
| * The default ExecutionContext.global is already batching | ||
| * or fast enough not to benefit from it; while | ||
| * `fromExecutor` and `fromExecutorService` do NOT add | ||
| * this optimization since they don't know whether the underlying | ||
| * executor will benefit from it. | ||
| * and make performance worse. | ||
| * A batching executor can create deadlocks if code does | ||
| * not use `scala.concurrent.blocking` when it should, | ||
| * because tasks created within other tasks will block | ||
| * on the outer task completing. | ||
| * This executor may run tasks in any order, including LIFO order. | ||
| * There are no ordering guarantees. | ||
| * | ||
| * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable | ||
| * in the calling thread synchronously. It must enqueue/handoff the Runnable. | ||
| * WARNING: Only use *EITHER* `submitAsyncBatched` OR `submitSyncBatched`!! | ||
| * | ||
| * When you implement this trait for async executors like thread pools, | ||
| * you're going to need to implement it something like the following: | ||
| * | ||
| * {{{ | ||
| * final override def submitAsync(runnable: Runnable): Unit = | ||
| * super[SuperClass].execute(runnable) // To prevent reentrancy into `execute` | ||
| * | ||
| * final override def execute(runnable: Runnable): Unit = | ||
| * if (runnable.isInstanceOf[Batchable]) // Or other logic | ||
| * submitAsyncBatched(runnable) | ||
| * else | ||
| * submitAsync(runnable) | ||
| * | ||
| * final override def reportFailure(cause: Throwable): Unit = … | ||
| * }}} | ||
| * | ||
| * And if you want to implement if for a sync, trampolining, executor you're | ||
| * going to implement it something like this: | ||
| * | ||
| * {{{ | ||
| * final override def submitAsync(runnable: Runnable): Unit = () | ||
| * | ||
| * final override def execute(runnable: Runnable): Unit = | ||
| * submitSyncBatched(runnable) // You typically will want to batch everything | ||
| * | ||
| * final override def reportFailure(cause: Throwable): Unit = | ||
| * ExecutionContext.defaultReporter(cause) // Or choose something more fitting | ||
| * }}} | ||
| * | ||
| */ | ||
| private[concurrent] trait BatchingExecutor extends Executor { | ||
| private[this] final val _tasksLocal = new ThreadLocal[Batch]() | ||
|
|
||
| private[this] final class Batch extends Runnable with BlockContext with (BlockContext => Throwable) { | ||
| private[this] final var parentBlockContext: BlockContext = _ | ||
| private[this] final var first: Runnable = _ | ||
| private[this] final var size: Int = _ | ||
| private[this] final var other: Array[Runnable] = BatchingExecutorStatics.emptyBatchArray | ||
|
|
||
| def this(r: Runnable) = { | ||
| this() | ||
| first = r | ||
| size = 1 | ||
| } | ||
| private[concurrent] trait BatchingExecutor extends Executor { | ||
| private[this] final val _tasksLocal = new ThreadLocal[AnyRef]() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made this AnyVal so we can use a cheap sentinel (empty string) to trigger Batch inflation and avoid allocating a batch for the simple case of adding a single runnable and subsequently just executing it, going from 0-1-0. |
||
|
|
||
| private def this(first: Runnable, other: Array[Runnable], size: Int) = { | ||
| this() | ||
| this.first = first | ||
| this.other = other | ||
| this.size = size | ||
| } | ||
| /* | ||
| * Batch implements a LIFO queue (stack) and is used as a trampolining Runnable. | ||
| * In order to conserve allocations, the first element in the batch is stored "unboxed" in | ||
| * the `first` field. Subsequent Runnables are stored in the array called `other`. | ||
| */ | ||
| private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable, protected final var other: Array[Runnable], protected final var size: Int) { | ||
|
|
||
| private[this] final def cloneAndClear(): Batch = { | ||
| val newBatch = new Batch(first, other, size) | ||
| this.first = null | ||
| this.other = BatchingExecutorStatics.emptyBatchArray | ||
| this.size = 0 | ||
| newBatch | ||
| } | ||
| private[this] final def ensureCapacity(curSize: Int): Array[Runnable] = { | ||
| val curOther = this.other | ||
| val curLen = curOther.length | ||
| if (curSize <= curLen) curOther | ||
| else { | ||
| val newLen = if (curLen == 0) 4 else curLen << 1 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I investigated different growth strategies, but here's the thing—if tasks are added incrementally they tend to be added faster than they can be consumed, which means that we need to grow fast anyway. |
||
|
|
||
| private[this] final def grow(): Unit = { | ||
| val len = other.length | ||
| other = | ||
| if (len == 0) new Array[Runnable](4) | ||
| else { | ||
| val newOther = new Array[Runnable](len << 1) | ||
| System.arraycopy(other, 0, newOther, 0, len) | ||
| newOther | ||
| } | ||
| if (newLen <= curLen) throw new StackOverflowError("Space limit of asynchronous stack reached: " + curLen) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit uncertain on this, on one hand we could throw a "normal" exception, on the other hand, we'll get an OOME if we are unable to allocate a large enough array—which is a fatal exception. I'm leaning towards this as an acceptable solution.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it makes sense to me to throw a VMError, but SOE doesn't feel like quite the right one to me.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unless I'm misunderstanding. is the batch only grown from within itself? in that case, SOE makes more sense to me, but not if you've just added too many
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can think of it as tasks added to the EC within executing a Task of that EC (recursive task generation) instead of running them directly on the stack, they are added to the Batch—on the heap—and if that batch grows too large, it is really a bit of a StackOverflowError. And the reason it is a serious problem is that the program will not know how to deal with running out of resources. |
||
| val newOther = new Array[Runnable](newLen) | ||
| System.arraycopy(curOther, 0, newOther, 0, curLen) | ||
| this.other = newOther | ||
| newOther | ||
| } | ||
| } | ||
|
|
||
| final def push(r: Runnable): Unit = { | ||
| val sz = size | ||
| if(sz > 0) { | ||
| if (sz > other.length) | ||
| grow() | ||
| other(sz - 1) = r | ||
| } else first = r | ||
| size = sz + 1 | ||
| val sz = this.size | ||
| if(sz == 0) | ||
| this.first = r | ||
| else | ||
| ensureCapacity(sz)(sz - 1) = r | ||
| this.size = sz + 1 | ||
| } | ||
|
|
||
| final def pop(): Runnable = | ||
| (size: @switch) match { | ||
| case 0 => null | ||
| case 1 => | ||
| val ret = first | ||
| first = null | ||
| size = 0 | ||
| ret | ||
| case n => | ||
| val ret = other(n - 2) | ||
| other(n - 2) = null | ||
| size = n - 1 | ||
| ret | ||
| } | ||
| @tailrec protected final def runN(n: Int): Unit = | ||
| if (n > 0) | ||
| (this.size: @switch) match { | ||
| case 0 => | ||
| case 1 => | ||
| val next = this.first | ||
| this.first = null | ||
| this.size = 0 | ||
| next.run() | ||
| runN(n - 1) | ||
| case sz => | ||
| val o = this.other | ||
| val next = o(sz - 2) | ||
| o(sz - 2) = null | ||
| this.size = sz - 1 | ||
| next.run() | ||
| runN(n - 1) | ||
| } | ||
| } | ||
|
|
||
| private[this] final class AsyncBatch private(_first: Runnable, _other: Array[Runnable], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable) { | ||
| private[this] final var parentBlockContext: BlockContext = BatchingExecutorStatics.MissingParentBlockContext | ||
|
|
||
| final def this(runnable: Runnable) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1) | ||
|
|
||
| // this method runs in the delegate ExecutionContext's thread | ||
| override final def run(): Unit = { | ||
| //This invariant needs to hold: require(_tasksLocal.get eq null) | ||
| _tasksLocal.set(this) | ||
| val failure = BlockContext.usingBlockContext(this)(this) | ||
| _tasksLocal.remove() | ||
| if (failure ne null) | ||
| throw handleRunFailure(failure) | ||
| _tasksLocal.set(this) // This is later cleared in `apply` or `runWithoutResubmit` | ||
|
|
||
| val f = resubmit(BlockContext.usingBlockContext(this)(this)) | ||
|
|
||
| if (f != null) | ||
| throw f | ||
| } | ||
|
|
||
| override final def apply(prevBlockContext: BlockContext): Throwable = { | ||
| /* LOGIC FOR ASYNCHRONOUS BATCHES */ | ||
| override final def apply(prevBlockContext: BlockContext): Throwable = try { | ||
| parentBlockContext = prevBlockContext | ||
| var failure: Throwable = null | ||
| try { | ||
| var r = pop() | ||
| while(r ne null) { | ||
| r.run() | ||
| r = pop() | ||
| } | ||
| } catch { | ||
| case t: Throwable => failure = t | ||
| } | ||
| parentBlockContext = null | ||
| failure | ||
| runN(BatchingExecutorStatics.runLimit) | ||
| null | ||
| } catch { | ||
| case t: Throwable => t // We are handling exceptions on the outside of this method | ||
| } finally { | ||
| parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext | ||
| _tasksLocal.remove() | ||
| } | ||
|
|
||
| private[this] final def handleRunFailure(cause: Throwable): Throwable = | ||
| if (size > 0 && (NonFatal(cause) || cause.isInstanceOf[InterruptedException])) { | ||
| try { unbatchedExecute(this); cause } catch { | ||
| /* Attempts to resubmit this Batch to the underlying ExecutionContext, | ||
| * this only happens for Batches where `resubmitOnBlock` is `true`. | ||
| * Only attempt to resubmit when there are `Runnables` left to process. | ||
| * Note that `cause` can be `null`. | ||
| */ | ||
| private[this] final def resubmit(cause: Throwable): Throwable = | ||
| if (this.size > 0) { | ||
| try { submitForExecution(this); cause } catch { | ||
| case inner: Throwable => | ||
| if (NonFatal(inner)) { | ||
| val e = new ExecutionException("Non-fatal error occurred and resubmission failed, see suppressed exception.", cause) | ||
| e.addSuppressed(inner) | ||
| e | ||
| } else inner | ||
| } | ||
| } else cause | ||
| } else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown | ||
|
|
||
| override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { | ||
| val pbc = parentBlockContext | ||
| if(size > 0) // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. | ||
| unbatchedExecute(cloneAndClear()) | ||
| private[this] final def cloneAndClear(): AsyncBatch = { | ||
| val newBatch = new AsyncBatch(this.first, this.other, this.size) | ||
| this.first = null | ||
| this.parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext | ||
| this.other = BatchingExecutorStatics.emptyBatchArray | ||
| this.size = 0 | ||
| newBatch | ||
| } | ||
|
|
||
| if (pbc ne null) pbc.blockOn(thunk) // now delegate the blocking to the previous BC | ||
| else { | ||
| try thunk finally throw new IllegalStateException("BUG in BatchingExecutor.Batch: parentBlockContext is null") | ||
| override final def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { | ||
| val pbc = parentBlockContext // Store this for later since `cloneAndClear()` will reset it | ||
|
|
||
| // If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. | ||
| if(this.size > 0) | ||
| submitForExecution(cloneAndClear()) // If this throws then we have bigger problems | ||
|
|
||
| pbc.blockOn(thunk) // Now delegate the blocking to the previous BC | ||
| } | ||
| } | ||
|
|
||
| private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tarsa check the "fie" pool benches with this commit. :D |
||
| @tailrec override final def run(): Unit = { | ||
| try runN(BatchingExecutorStatics.runLimit) catch { | ||
| case ie: InterruptedException => | ||
| reportFailure(ie) // TODO: Handle InterruptedException differently? | ||
| case f if NonFatal(f) => | ||
| reportFailure(f) | ||
| } | ||
|
|
||
| if (this.size > 0) | ||
| run() | ||
| } | ||
| } | ||
|
|
||
| protected def unbatchedExecute(r: Runnable): Unit | ||
| /** MUST throw a NullPointerException when `runnable` is null | ||
| * When implementing a sync BatchingExecutor, it is RECOMMENDED | ||
| * to implement this method as `runnable.run()` | ||
| */ | ||
| protected def submitForExecution(runnable: Runnable): Unit | ||
|
|
||
| /** Reports that an asynchronous computation failed. | ||
| * See `ExecutionContext.reportFailure(throwable: Throwable)` | ||
| */ | ||
| protected def reportFailure(throwable: Throwable): Unit | ||
|
|
||
| private[this] final def batchedExecute(runnable: Runnable): Unit = { | ||
| /** | ||
| * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same | ||
| * implementation of `BatchingExecutor` | ||
| */ | ||
| protected final def submitAsyncBatched(runnable: Runnable): Unit = { | ||
| val b = _tasksLocal.get | ||
| if (b ne null) b.push(runnable) | ||
| else unbatchedExecute(new Batch(runnable)) | ||
| if (b.isInstanceOf[AsyncBatch]) b.asInstanceOf[AsyncBatch].push(runnable) | ||
| else submitForExecution(new AsyncBatch(runnable)) | ||
| } | ||
|
|
||
| override def execute(runnable: Runnable): Unit = | ||
| if(batchable(runnable)) batchedExecute(runnable) | ||
| else unbatchedExecute(runnable) | ||
|
|
||
| /** Override this to define which runnables will be batched. | ||
| * By default it tests the Runnable for being an instance of [Batchable]. | ||
| **/ | ||
| protected def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable] | ||
| /** | ||
| * WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same | ||
| * implementation of `BatchingExecutor` | ||
| */ | ||
| protected final def submitSyncBatched(runnable: Runnable): Unit = { | ||
| Objects.requireNonNull(runnable, "runnable is null") | ||
| val tl = _tasksLocal | ||
| val b = tl.get | ||
| if (b.isInstanceOf[SyncBatch]) b.asInstanceOf[SyncBatch].push(runnable) | ||
| else { | ||
| val i = if (b ne null) b.asInstanceOf[java.lang.Integer].intValue else 0 | ||
| if (i < BatchingExecutorStatics.syncPreBatchDepth) { | ||
| tl.set(java.lang.Integer.valueOf(i + 1)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use j.l.Integer here to control the bytecode generated—the Integers we will be using here are all internally cached inside Integer, so no allocations on this path. |
||
| try submitForExecution(runnable) // User code so needs to be try-finally guarded here | ||
| finally tl.set(b) | ||
| } else { | ||
| val batch = new SyncBatch(runnable) | ||
| tl.set(batch) | ||
| submitForExecution(batch) | ||
| tl.set(b) // Batch only throws fatals so no need for try-finally here | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NthPortal @tarsa @SethTisue
This commit is a significant improvement in fairness and simplification of the code, with some dramatic performance improvements for synchronous/trampolining implementations since it allows for nested on-stack Runnable invocations (up to a limit of 16).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anatse @mkeskells @hepin1989 Please also have a look at this if you have time/interest! :)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recently tried out a similar optimization in the akka-streams
GraphInterpreter. The main cost of trampolining is often the multimorphic dispatching to the thunks to execute next. I hoped that running it on the stack would enable the JIT to inline concrete implementations (with all the potentially positive ripple-on effects). Turned out that it didn't work out for theGraphInterpreterbecause inlining didn't happen. I came to the conclusion that the JIT would have to reevaluate some previous inlining decisions to recognize that some call-sites would now be monomorphic which it didn't do.I think this may be a general result (but needs more investigation) that the JIT cannot optimize away the overhead of "sufficiently generic" code. The only way to remove those megamorphic call-sites (e.g. inside of any higher-order function that calls a user-supplied function like
maporflatMapthat cannot be inlined from the outside), is to provide more concrete single-use call-sites which is only possible by generating more code in the first place. I tried that out manually for the main akka-http Graph here and found some great speed improvements (but how would you generate those classes automatically? Tricky stuff...).So, one advice for benchmarking generic things like
Futureis too make sure to have complex enough test cases. If you only testfuture.map(f).flatMap(g)then invoking those on the stack might lead to complete inlining in the best case. So, the test needs at least use three different arguments tomapetc to make those megamorphic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that would not necessarily be anything else but a worst-case benchmark, wouldn't it? (Not all callsites will be megamorphic).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrudolph
Hehe, that code is pretty wild. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I'd call a code-base that uses Future.map more than once or twice worst-case... ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrudolph But it's per call-site not per method invocation.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean.
What I meant is that if you use
Future.map(f)with different arguments then the call-site to callfwill be megamorphic in themapimplementation (i.e. in https://github.com/viktorklang/scala/blob/50c6f945f16926e1aab8ddd618a71aec68438156/src/library/scala/concurrent/impl/Promise.scala#L431). If you just measure Future infrastructure cost, I suspect the cost of this dispatch will be significant. I suspect more significant than switching from trampolining (BatchingExecutor) to running it on the stack with this change here. I can't say for sure without running the benchmarks myself, just saying that adding a benchmark that is more complex (severalFuture.mapcalls with different arguments) will show more realistic results.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a bench to FutureBenchmark which illustrates what you mean and then let's see how it performs :)