Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@
/project/project/target/
/project/project/project/target/
/test/macro-annot/target/
/test/files/target/
/test/target/
/build-sbt/
local.sbt
300 changes: 188 additions & 112 deletions src/library/scala/concurrent/BatchingExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

package scala.concurrent

import java.util.ArrayDeque
import java.util.concurrent.Executor
import scala.annotation.{ switch, tailrec }
import java.util.Objects
import scala.util.control.NonFatal
import scala.annotation.{switch, tailrec}

/**
* Marker trait to indicate that a Runnable is Batchable by BatchingExecutors
Expand All @@ -26,6 +26,17 @@ trait Batchable {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal @tarsa @SethTisue

This commit is a significant improvement in fairness and simplification of the code, with some dramatic performance improvements for synchronous/trampolining implementations since it allows for nested on-stack Runnable invocations (up to a limit of 16).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anatse @mkeskells @hepin1989 Please also have a look at this if you have time/interest! :)

Copy link
Contributor

@jrudolph jrudolph Feb 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with some dramatic performance improvements for synchronous/trampolining implementations since it allows for nested on-stack Runnable invocations (up to a limit of 16).

I recently tried out a similar optimization in the akka-streams GraphInterpreter. The main cost of trampolining is often the multimorphic dispatching to the thunks to execute next. I hoped that running it on the stack would enable the JIT to inline concrete implementations (with all the potentially positive ripple-on effects). Turned out that it didn't work out for the GraphInterpreter because inlining didn't happen. I came to the conclusion that the JIT would have to reevaluate some previous inlining decisions to recognize that some call-sites would now be monomorphic which it didn't do.

I think this may be a general result (but needs more investigation) that the JIT cannot optimize away the overhead of "sufficiently generic" code. The only way to remove those megamorphic call-sites (e.g. inside of any higher-order function that calls a user-supplied function like map or flatMap that cannot be inlined from the outside), is to provide more concrete single-use call-sites which is only possible by generating more code in the first place. I tried that out manually for the main akka-http Graph here and found some great speed improvements (but how would you generate those classes automatically? Tricky stuff...).

So, one advice for benchmarking generic things like Future is too make sure to have complex enough test cases. If you only test future.map(f).flatMap(g) then invoking those on the stack might lead to complete inlining in the best case. So, the test needs at least use three different arguments to map etc to make those megamorphic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, one advice for benchmarking generic things like Future is too make sure to have complex enough test cases. If you only test future.map(f).flatMap(g) then invoking those on the stack might lead to complete inlining in the best case. So, the test needs at least use three different arguments to map etc to make those megamorphic.

Well, that would not necessarily be anything else but a worst-case benchmark, wouldn't it? (Not all callsites will be megamorphic).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrudolph

I think this may be a general result (but needs more investigation) that the JIT cannot optimize away the overhead of "sufficiently generic" code. The only way to remove those megamorphic call-sites (e.g. inside of any higher-order function that calls a user-supplied function like map or flatMap that cannot be inlined from the outside), is to provide more concrete single-use call-sites which is only possible by generating more code in the first place. I tried that out manually for the main akka-http Graph here and found some great speed improvements (but how would you generate those classes automatically? Tricky stuff...).

Hehe, that code is pretty wild. :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that would not necessarily be anything else but a worst-case benchmark, wouldn't it? (Not all callsites will be megamorphic).

Not sure if I'd call a code-base that uses Future.map more than once or twice worst-case... ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrudolph But it's per call-site not per method invocation.

Copy link
Contributor

@jrudolph jrudolph Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean.

What I meant is that if you use Future.map(f) with different arguments then the call-site to call f will be megamorphic in the map implementation (i.e. in https://github.com/viktorklang/scala/blob/50c6f945f16926e1aab8ddd618a71aec68438156/src/library/scala/concurrent/impl/Promise.scala#L431). If you just measure Future infrastructure cost, I suspect the cost of this dispatch will be significant. I suspect more significant than switching from trampolining (BatchingExecutor) to running it on the stack with this change here. I can't say for sure without running the benchmarks myself, just saying that adding a benchmark that is more complex (several Future.map calls with different arguments) will show more realistic results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a bench to FutureBenchmark which illustrates what you mean and then let's see how it performs :)

private[concurrent] object BatchingExecutorStatics {
final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0)

// Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion)
final val syncPreBatchDepth = 16
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

16 seems like a sweet-spot in terms of tradeoffs, but who knows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making them configurable would be nice. In the best case, per dispatcher but if that's not possible maybe by JVM property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I contemplated having it as a parameter to submitSyncBatched, but it would be weird if there are multiple invocations which use different numbers in the same implementation. And putting it as a method on a trait means an invokeinterface extra per call, which is costly. Having it as a JVM-parameter means that it is set for all implementations which may or may not be desirable either. I'm not sure what the best option is in this case, but it is a good question—do you have any preference and why?


// Max number of Runnables processed in one go (to prevent starvation of other tasks on the pool)
final val runLimit = 1024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These limits are up for debate, I'm not sure that there is any optimal single number at all. Could lower to 64 or 128 or whatnot.


final object MissingParentBlockContext extends BlockContext {
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T =
try thunk finally throw new IllegalStateException("BUG in BatchingExecutor.Batch: parentBlockContext is null")
}
}

/**
Expand All @@ -38,154 +49,219 @@ private[concurrent] object BatchingExecutorStatics {
* thread which may improve CPU affinity. However,
* if tasks passed to the Executor are blocking
* or expensive, this optimization can prevent work-stealing
* and make performance worse. Also, some ExecutionContext
* may be fast enough natively that this optimization just
* adds overhead.
* The default ExecutionContext.global is already batching
* or fast enough not to benefit from it; while
* `fromExecutor` and `fromExecutorService` do NOT add
* this optimization since they don't know whether the underlying
* executor will benefit from it.
* and make performance worse.
* A batching executor can create deadlocks if code does
* not use `scala.concurrent.blocking` when it should,
* because tasks created within other tasks will block
* on the outer task completing.
* This executor may run tasks in any order, including LIFO order.
* There are no ordering guarantees.
*
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
* WARNING: Only use *EITHER* `submitAsyncBatched` OR `submitSyncBatched`!!
*
* When you implement this trait for async executors like thread pools,
* you're going to need to implement it something like the following:
*
* {{{
* final override def submitAsync(runnable: Runnable): Unit =
* super[SuperClass].execute(runnable) // To prevent reentrancy into `execute`
*
* final override def execute(runnable: Runnable): Unit =
* if (runnable.isInstanceOf[Batchable]) // Or other logic
* submitAsyncBatched(runnable)
* else
* submitAsync(runnable)
*
* final override def reportFailure(cause: Throwable): Unit = …
* }}}
*
* And if you want to implement if for a sync, trampolining, executor you're
* going to implement it something like this:
*
* {{{
* final override def submitAsync(runnable: Runnable): Unit = ()
*
* final override def execute(runnable: Runnable): Unit =
* submitSyncBatched(runnable) // You typically will want to batch everything
*
* final override def reportFailure(cause: Throwable): Unit =
* ExecutionContext.defaultReporter(cause) // Or choose something more fitting
* }}}
*
*/
private[concurrent] trait BatchingExecutor extends Executor {
private[this] final val _tasksLocal = new ThreadLocal[Batch]()

private[this] final class Batch extends Runnable with BlockContext with (BlockContext => Throwable) {
private[this] final var parentBlockContext: BlockContext = _
private[this] final var first: Runnable = _
private[this] final var size: Int = _
private[this] final var other: Array[Runnable] = BatchingExecutorStatics.emptyBatchArray

def this(r: Runnable) = {
this()
first = r
size = 1
}
private[concurrent] trait BatchingExecutor extends Executor {
private[this] final val _tasksLocal = new ThreadLocal[AnyRef]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this AnyVal so we can use a cheap sentinel (empty string) to trigger Batch inflation and avoid allocating a batch for the simple case of adding a single runnable and subsequently just executing it, going from 0-1-0.


private def this(first: Runnable, other: Array[Runnable], size: Int) = {
this()
this.first = first
this.other = other
this.size = size
}
/*
* Batch implements a LIFO queue (stack) and is used as a trampolining Runnable.
* In order to conserve allocations, the first element in the batch is stored "unboxed" in
* the `first` field. Subsequent Runnables are stored in the array called `other`.
*/
private[this] sealed abstract class AbstractBatch protected (protected final var first: Runnable, protected final var other: Array[Runnable], protected final var size: Int) {

private[this] final def cloneAndClear(): Batch = {
val newBatch = new Batch(first, other, size)
this.first = null
this.other = BatchingExecutorStatics.emptyBatchArray
this.size = 0
newBatch
}
private[this] final def ensureCapacity(curSize: Int): Array[Runnable] = {
val curOther = this.other
val curLen = curOther.length
if (curSize <= curLen) curOther
else {
val newLen = if (curLen == 0) 4 else curLen << 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated different growth strategies, but here's the thing—if tasks are added incrementally they tend to be added faster than they can be consumed, which means that we need to grow fast anyway.


private[this] final def grow(): Unit = {
val len = other.length
other =
if (len == 0) new Array[Runnable](4)
else {
val newOther = new Array[Runnable](len << 1)
System.arraycopy(other, 0, newOther, 0, len)
newOther
}
if (newLen <= curLen) throw new StackOverflowError("Space limit of asynchronous stack reached: " + curLen)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit uncertain on this, on one hand we could throw a "normal" exception, on the other hand, we'll get an OOME if we are unable to allocate a large enough array—which is a fatal exception. I'm leaning towards this as an acceptable solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to me to throw a VMError, but SOE doesn't feel like quite the right one to me. VirtualMachineError doesn't appear to be abstract - what do you think of throwing that instead?
also, I'm wondering if the max size should go closer to the limit, rather than stopping at Int.MaxValue / 2 + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless I'm misunderstanding. is the batch only grown from within itself? in that case, SOE makes more sense to me, but not if you've just added too many Runnables from anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can think of it as tasks added to the EC within executing a Task of that EC (recursive task generation) instead of running them directly on the stack, they are added to the Batch—on the heap—and if that batch grows too large, it is really a bit of a StackOverflowError. And the reason it is a serious problem is that the program will not know how to deal with running out of resources.

val newOther = new Array[Runnable](newLen)
System.arraycopy(curOther, 0, newOther, 0, curLen)
this.other = newOther
newOther
}
}

final def push(r: Runnable): Unit = {
val sz = size
if(sz > 0) {
if (sz > other.length)
grow()
other(sz - 1) = r
} else first = r
size = sz + 1
val sz = this.size
if(sz == 0)
this.first = r
else
ensureCapacity(sz)(sz - 1) = r
this.size = sz + 1
}

final def pop(): Runnable =
(size: @switch) match {
case 0 => null
case 1 =>
val ret = first
first = null
size = 0
ret
case n =>
val ret = other(n - 2)
other(n - 2) = null
size = n - 1
ret
}
@tailrec protected final def runN(n: Int): Unit =
if (n > 0)
(this.size: @switch) match {
case 0 =>
case 1 =>
val next = this.first
this.first = null
this.size = 0
next.run()
runN(n - 1)
case sz =>
val o = this.other
val next = o(sz - 2)
o(sz - 2) = null
this.size = sz - 1
next.run()
runN(n - 1)
}
}

private[this] final class AsyncBatch private(_first: Runnable, _other: Array[Runnable], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable) {
private[this] final var parentBlockContext: BlockContext = BatchingExecutorStatics.MissingParentBlockContext

final def this(runnable: Runnable) = this(runnable, BatchingExecutorStatics.emptyBatchArray, 1)

// this method runs in the delegate ExecutionContext's thread
override final def run(): Unit = {
//This invariant needs to hold: require(_tasksLocal.get eq null)
_tasksLocal.set(this)
val failure = BlockContext.usingBlockContext(this)(this)
_tasksLocal.remove()
if (failure ne null)
throw handleRunFailure(failure)
_tasksLocal.set(this) // This is later cleared in `apply` or `runWithoutResubmit`

val f = resubmit(BlockContext.usingBlockContext(this)(this))

if (f != null)
throw f
}

override final def apply(prevBlockContext: BlockContext): Throwable = {
/* LOGIC FOR ASYNCHRONOUS BATCHES */
override final def apply(prevBlockContext: BlockContext): Throwable = try {
parentBlockContext = prevBlockContext
var failure: Throwable = null
try {
var r = pop()
while(r ne null) {
r.run()
r = pop()
}
} catch {
case t: Throwable => failure = t
}
parentBlockContext = null
failure
runN(BatchingExecutorStatics.runLimit)
null
} catch {
case t: Throwable => t // We are handling exceptions on the outside of this method
} finally {
parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext
_tasksLocal.remove()
}

private[this] final def handleRunFailure(cause: Throwable): Throwable =
if (size > 0 && (NonFatal(cause) || cause.isInstanceOf[InterruptedException])) {
try { unbatchedExecute(this); cause } catch {
/* Attempts to resubmit this Batch to the underlying ExecutionContext,
* this only happens for Batches where `resubmitOnBlock` is `true`.
* Only attempt to resubmit when there are `Runnables` left to process.
* Note that `cause` can be `null`.
*/
private[this] final def resubmit(cause: Throwable): Throwable =
if (this.size > 0) {
try { submitForExecution(this); cause } catch {
case inner: Throwable =>
if (NonFatal(inner)) {
val e = new ExecutionException("Non-fatal error occurred and resubmission failed, see suppressed exception.", cause)
e.addSuppressed(inner)
e
} else inner
}
} else cause
} else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown

override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
val pbc = parentBlockContext
if(size > 0) // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
unbatchedExecute(cloneAndClear())
private[this] final def cloneAndClear(): AsyncBatch = {
val newBatch = new AsyncBatch(this.first, this.other, this.size)
this.first = null
this.parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext
this.other = BatchingExecutorStatics.emptyBatchArray
this.size = 0
newBatch
}

if (pbc ne null) pbc.blockOn(thunk) // now delegate the blocking to the previous BC
else {
try thunk finally throw new IllegalStateException("BUG in BatchingExecutor.Batch: parentBlockContext is null")
override final def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
val pbc = parentBlockContext // Store this for later since `cloneAndClear()` will reset it

// If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
if(this.size > 0)
submitForExecution(cloneAndClear()) // If this throws then we have bigger problems

pbc.blockOn(thunk) // Now delegate the blocking to the previous BC
}
}

private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tarsa check the "fie" pool benches with this commit. :D

@tailrec override final def run(): Unit = {
try runN(BatchingExecutorStatics.runLimit) catch {
case ie: InterruptedException =>
reportFailure(ie) // TODO: Handle InterruptedException differently?
case f if NonFatal(f) =>
reportFailure(f)
}

if (this.size > 0)
run()
}
}

protected def unbatchedExecute(r: Runnable): Unit
/** MUST throw a NullPointerException when `runnable` is null
* When implementing a sync BatchingExecutor, it is RECOMMENDED
* to implement this method as `runnable.run()`
*/
protected def submitForExecution(runnable: Runnable): Unit

/** Reports that an asynchronous computation failed.
* See `ExecutionContext.reportFailure(throwable: Throwable)`
*/
protected def reportFailure(throwable: Throwable): Unit

private[this] final def batchedExecute(runnable: Runnable): Unit = {
/**
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
* implementation of `BatchingExecutor`
*/
protected final def submitAsyncBatched(runnable: Runnable): Unit = {
val b = _tasksLocal.get
if (b ne null) b.push(runnable)
else unbatchedExecute(new Batch(runnable))
if (b.isInstanceOf[AsyncBatch]) b.asInstanceOf[AsyncBatch].push(runnable)
else submitForExecution(new AsyncBatch(runnable))
}

override def execute(runnable: Runnable): Unit =
if(batchable(runnable)) batchedExecute(runnable)
else unbatchedExecute(runnable)

/** Override this to define which runnables will be batched.
* By default it tests the Runnable for being an instance of [Batchable].
**/
protected def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable]
/**
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
* implementation of `BatchingExecutor`
*/
protected final def submitSyncBatched(runnable: Runnable): Unit = {
Objects.requireNonNull(runnable, "runnable is null")
val tl = _tasksLocal
val b = tl.get
if (b.isInstanceOf[SyncBatch]) b.asInstanceOf[SyncBatch].push(runnable)
else {
val i = if (b ne null) b.asInstanceOf[java.lang.Integer].intValue else 0
if (i < BatchingExecutorStatics.syncPreBatchDepth) {
tl.set(java.lang.Integer.valueOf(i + 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use j.l.Integer here to control the bytecode generated—the Integers we will be using here are all internally cached inside Integer, so no allocations on this path.

try submitForExecution(runnable) // User code so needs to be try-finally guarded here
finally tl.set(b)
} else {
val batch = new SyncBatch(runnable)
tl.set(batch)
submitForExecution(batch)
tl.set(b) // Batch only throws fatals so no need for try-finally here
}
}
}
}
14 changes: 8 additions & 6 deletions src/library/scala/concurrent/BlockContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ object BlockContext {
**/
final def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
val old = contextLocal.get // can be null
try {
if (old eq blockContext) body
else {
contextLocal.set(blockContext)
body
} finally {
contextLocal.set(old)
try body finally contextLocal.set(old)
}
}

Expand All @@ -102,7 +101,10 @@ object BlockContext {
**/
final def usingBlockContext[I, T](blockContext: BlockContext)(f: BlockContext => T): T = {
val old = contextLocal.get // can be null
contextLocal.set(blockContext)
try f(prefer(old)) finally contextLocal.set(old)
if (old eq blockContext) f(prefer(old))
else {
contextLocal.set(blockContext)
try f(prefer(old)) finally contextLocal.set(old)
}
}
}
Loading