Skip to content

Conversation

@viktorklang
Copy link
Contributor

  • Improvements to Batching Executor by separating async vs sync
  • Performance improvements to installation of BlockContext
  • Reusing EC if Batchable for some of the Future utilities
  • Using Semaphore instead of bespoke permit counter
  • Switching to batching only select Transformations which benefit
  • Updating the FutureBenchmark to accommodate incompatibilities

@scala-jenkins scala-jenkins added this to the 2.13.0-RC1 milestone Jan 17, 2019
else {
val newLen = if (curLen == 0) 4 else curLen << 1

if (newLen <= curLen) throw new StackOverflowError("Space limit of asynchronous stack reached: " + curLen)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit uncertain on this, on one hand we could throw a "normal" exception, on the other hand, we'll get an OOME if we are unable to allocate a large enough array—which is a fatal exception. I'm leaning towards this as an acceptable solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to me to throw a VMError, but SOE doesn't feel like quite the right one to me. VirtualMachineError doesn't appear to be abstract - what do you think of throwing that instead?
also, I'm wondering if the max size should go closer to the limit, rather than stopping at Int.MaxValue / 2 + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless I'm misunderstanding. is the batch only grown from within itself? in that case, SOE makes more sense to me, but not if you've just added too many Runnables from anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can think of it as tasks added to the EC within executing a Task of that EC (recursive task generation) instead of running them directly on the stack, they are added to the Batch—on the heap—and if that batch grows too large, it is really a bit of a StackOverflowError. And the reason it is a serious problem is that the program will not know how to deal with running out of resources.

val curLen = curOther.length
if (curSize <= curLen) curOther
else {
val newLen = if (curLen == 0) 4 else curLen << 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated different growth strategies, but here's the thing—if tasks are added incrementally they tend to be added faster than they can be consumed, which means that we need to grow fast anyway.

private[concurrent] trait BatchingExecutor extends Executor {
private[this] final val _tasksLocal = new ThreadLocal[Batch]()
private[concurrent] trait BatchingExecutor extends Executor {
private[this] final val _tasksLocal = new ThreadLocal[AnyRef]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this AnyVal so we can use a cheap sentinel (empty string) to trigger Batch inflation and avoid allocating a batch for the simple case of adding a single runnable and subsequently just executing it, going from 0-1-0.

failure
}

@tailrec private[this] final def runWithoutResubmit(failure: Throwable): Throwable =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We run this for sync / trampolines BatchingExecutors.

private[this] final def handleRunFailure(cause: Throwable): Throwable =
if (size > 0 && (NonFatal(cause) || cause.isInstanceOf[InterruptedException])) {
try { unbatchedExecute(this); cause } catch {
if (resubmitOnBlock && size > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only resubmit async BatchingExecutors, and only if there's anything to resubmit.

if (b.isInstanceOf[Batch]) b.asInstanceOf[Batch].push(runnable)
else submitAsync(new Batch(runnable, resubmitOnBlock = true))
} else submitAsync(runnable)
} else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delineation between async and sync implementations. We batch all sync Runnables, since not doing so would grow the actual thread stack (possibly infinitely).

@viktorklang
Copy link
Contributor Author

Ping @SethTisue @NthPortal

else submitAsync(new Batch(runnable, resubmitOnBlock = true))
} else submitAsync(runnable)
} else {
Objects.requireNonNull(runnable) // Make sure we're not trying to add nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hepin1989 You mean like "runnable is null"?

val b = _tasksLocal.get
if (b.isInstanceOf[Batch]) b.asInstanceOf[Batch].push(runnable)
else if (b ne null) {
_tasksLocal.set("") // Set a marker to indicate that we are submitting synchronously
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the "" to a constant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the meaning might be clearer if it was a constant called marker or sentinel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"" is a constant in the constant pool. I wanted something which is allocation-free and belongs to the root class loader in case it isn't cleared. (Since that is benign and does not leak memory)

in.iterator.foldLeft(successful(bf.newBuilder(in))) {
(fr, fa) => fr.zipWith(fa)(Future.addToBuilderFun)
}.map(_.result())(InternalCallbackExecutor)
}.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else InternalCallbackExecutor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract to a method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hepin1989 I might do that!

*
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this doc comment no longer accurate? or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several inaccuracies in there—let me see if I can correct it.

}

@tailrec private[this] final def runWithoutResubmit(failure: Throwable): Throwable =
if ((failure ne null) && (failure.isInstanceOf[InterruptedException] || NonFatal(failure))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're being consistent with your above change,

Suggested change
if ((failure ne null) && (failure.isInstanceOf[InterruptedException] || NonFatal(failure))) {
if ((failure != null) && (failure.isInstanceOf[InterruptedException] || NonFatal(failure))) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

else submitAsync(new Batch(runnable, resubmitOnBlock = true))
} else submitAsync(runnable)
} else {
Objects.requireNonNull(runnable) // Make sure we're not trying to add nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there only a null check if !isAsync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question!

batchable would return false for a null parameter, which means that it will be submitted to submitAsync instead, which typically will call super.execute(runnable) and that call is specced to throw an exception if the Runnable is null: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html#execute(java.lang.Runnable)

I'll add a comment.

@viktorklang viktorklang force-pushed the wip-additional-perf-futures-√ branch from cdee3bd to fce2dd3 Compare January 20, 2019 19:15
@adriaanm
Copy link
Contributor

/nothingtoseehere

@tarsa
Copy link

tarsa commented Jan 25, 2019

What about removing Promise linking altogether? It could be a significant performance win (though I don't know it). OTOH Promise linking is an undocumented (except some blog posts) heuristics that is an implementation detail, so removing it won't break any guarantees.

@viktorklang
Copy link
Contributor Author

viktorklang commented Jan 26, 2019 via email

@tarsa
Copy link

tarsa commented Jan 26, 2019

Hi @viktorklang

I've removed promise linking and nothing broke, except of course a test that specifically tests linking (it stopped compiling because it was accessing non-public methods).

Here's my attempt: https://github.com/tarsa/scala/commits/promise-linking-removal

I've run all tests mentioned in main readme:

  • sbt test
  • sbt partest
  • sbt scalacheck/test

Also I've run benchmark from scala.concurrent.FutureBenchmark.scala but I've reduced iterations number severely (I didn't want to spend 17 days benchmarking) so the results aren't fully reliable. From my quick benchmark's results it seems that removing promise linking brings big wins (about 10x) in two benchmarks:

  • FirstCompletedOfFutureBenchmark.pre fjp
  • VariousFutureBenchmark.pre fie

In other tests our implementations were more or less trading blows. Maybe all of these differences are due to noise in the results (too short benchmarking).

@viktorklang
Copy link
Contributor Author

viktorklang commented Jan 27, 2019 via email

@tarsa
Copy link

tarsa commented Jan 28, 2019

Thanks! May I ask—have you had a look at memory usage (pressure and retention)?

How to measure it automatically? I've reduced memory from 1G to 100M in the benchmark and increased recursion level from 1024 to 8192. That didn't change much in relative performance.

I've found that DefaultPromise.completeWith is rather unnecessarily slow because of extra dispatch to executor service. I've optimized it by doing synchronous DefaultPromise.completeWith, just like when you do DefaultPromise.complete. When doing DefaultPromise.complete the callbacks are traversed on the same thread that called DefaultPromise.complete. I've optimized it here: tarsa/scala@bfb96ea...tarsa:18e5676871928dafc5d2c13d3b36f09da32cf032 but this solution is probably prone to stack overflow. I'll come up with something stack safe that should be foundation to main change which is promise linking removal.

@viktorklang
Copy link
Contributor Author

viktorklang commented Jan 28, 2019 via email

@tarsa
Copy link

tarsa commented Jan 28, 2019

other.onComplete(this)(InternalCallbackExecutor)

Well, that still goes through executor, which IIUC imposes some overhead of managing task queue and thread synchronization. I'm not sure how that InternalCallbackExecutor works, though.

Promise linking has the advantage that it reduces the number of completeWith operations. If completeWith operations are slow then Promise linking can be a performance win. Therefore I'm working on some stack-safe optimization of completeWith.

Update:
Bypassing InternalCallbackExecutor (using an extra if in Transformation.submitWithValue) leads to worse performance on FlatMapFutureBenchmark, surprisingly. Hmm....

@tarsa
Copy link

tarsa commented Feb 2, 2019

I have done some more changes ( tarsa/scala@bfb96ea...tarsa:promise-linking-removal ), benchmarks and test, but overall conclusion is that there's no significant performance increase from removal of promise linking - at least not if stacked on top of this pull request. In fact there seems to be some performance regression (I tested mainly with recursion depth = 8192 instead of 1024).

Various benign small changes had big impact on performance, for better or worse (depending on particular benchmark). It seems that Java's JIT is somewhat unpredictable. I didn't want to spend time analyzing assembly code produced by JIT and overtune the source code to the JIT, so I'll stop the experiment. There are no significant performance improvements (that I hoped for initially) to warrant further effort investment.

OTOH, straight removal of promise linking hasn't caused any correctness or stability problems. Only when I tried stack-unsafe complete/completeWith things started to break (i.e. I got stack overflows), but stack-unsafe complete/completeWith is not tied to promise linking removal.

Overall, only in the (unlikely?) case where promise linking blocks other optimizations, I would consider promise linking removal.

@NthPortal
Copy link
Contributor

@tarsa for unbatched execution, InternalCallbackExecutor just immediately invokes Runnable#run(); for batched execution, it acts like a normal BatchingExecutor


/**
* Returns the associaed `Future` with this `Promise`
* Returns the associated `Future` with this `Promise`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this say 'Future associated with'?

Suggested change
* Returns the associated `Future` with this `Promise`
* Returns the `Future` associated with this `Promise`

size = n - 1
ret
final def pop(): Runnable = {
val sz = this.size
Copy link

@anatse anatse Feb 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clearly understood, can you explain where is an improvement? It seems like additional instructions appears in case when size equals to 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is that pop() is always invoked post a size > 0-check, so that means that the 0-case is never triggered. This change makes the method having a single branch instead of potentially 2, since we cannot know that the JVM will generate anything more efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the JVM would only have one branch ( because of @switch) ensuring that the bytecode was a tableswitch or lookupswitch etc (which generates a faster machine code as well on x64 than a series of if statements) , but an if is still probably faster here :-)

It may be worth adding a doc comment on the method here to explain that size > 0 should be checked by the caller

If size cannot be < 1 should the test be test == 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkeskells the size > 0 check is not mandatory, the logic will still work. AFAIK HotSpot passes both tableswitch and lookupswitch through the same codepath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang they are different constraints that allow generation of different machine code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkeskells Agreed. I've changed the code to not do the size-check before and instead do the tableswitch.

@viktorklang viktorklang force-pushed the wip-additional-perf-futures-√ branch 2 times, most recently from bf96184 to 2abdbcd Compare February 4, 2019 17:50
else {
val i = if (b ne null) b.asInstanceOf[java.lang.Integer].intValue else 0
if (i < BatchingExecutorStatics.syncPreBatchDepth) {
tl.set(java.lang.Integer.valueOf(i + 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use j.l.Integer here to control the bytecode generated—the Integers we will be using here are all internally cached inside Integer, so no allocations on this path.

}
}

private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tarsa check the "fie" pool benches with this commit. :D

@viktorklang viktorklang force-pushed the wip-additional-perf-futures-√ branch from 5cf0d16 to 909d125 Compare February 12, 2019 15:18
@SethTisue
Copy link
Member

SethTisue commented Feb 13, 2019

Copy link
Contributor

@jrudolph jrudolph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had only a quick look. It's looking good, looking forward to the performance improvements. :)

Generally, it would be good to break these kinds of changes up into smaller ones and accompany them with benchmarks to make it easier to verify if the added code complexity (in some parts) is actually worth it.

final val marker = ""

// Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion)
final val syncPreBatchDepth = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making them configurable would be nice. In the best case, per dispatcher but if that's not possible maybe by JVM property?

}
if ((this ne p) && compareAndSet(state, l)) {
if (state ne Noop) p.dispatchOrAddCallbacks(p.get(), state.asInstanceOf[Callbacks[T]]) // Noop-check is important here
} else linkRootOf(p, l)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, for !(pe ne this) linkRootOf was not called. Was that change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang
Copy link
Contributor Author

@jrudolph I agree that it is generally better to split it up in multiple small PRs, but this is a massive overhaul an some optimizations are invalidated by other optimizations or turn out to introduce design constraints for future progress—so I anticipate further optimizations to this design to be much smaller and self-contained. I do believe that I'm already up to the boundaries of what this design can perform like. So for a future design it might look completely different. :S

@viktorklang
Copy link
Contributor Author

viktorklang commented Feb 14, 2019 via email

@viktorklang
Copy link
Contributor Author

@SethTisue
Copy link
Member

If only scala-xml failed then that should be fine, right?

scala-xml failing takes out many downstream projects. I'll have to figure out what happened there, and try again. doesn't seem like it could be related to your PR.

@SethTisue
Copy link
Member

@viktorklang could you rebase this onto current 2.13.x? otherwise I don't think we'll get usable community build results

  * Improvements to Batching Executor by separating async vs sync
  * Performance improvements to installation of BlockContext
  * Reusing EC if Batchable for some of the Future utilities
  * Using Semaphore instead of bespoke permit counter
  * Switching to batching only select Transformations which benefit
  * Updating the FutureBenchmark to accommodate incompatibilities
  * A regression was introduced since one of the tests in
    FutureSpec was comparing the wrong things, not noticing
    that regression led to design considerations which are
    in hindsight invalid. For that reason scala.concurrent.
    Transformation now stores its ExecutionContext for longer,
    this in order to make sure that we can use ec.reportFailure
    on NonFatal Throwables thrown when executing Future.foreach
    and Future.onComplete—this to live up to their contract, and
    to facilitate easier debugging of user code which uses Future.

  * I noticed that a typo had been introduced when changing between
    successful and failed branches when submitting Runnables to
    synchronous BatchingExecutors—this typo led to not activating
    the fast-path of that code, which yields a significant performance
    by delaying the inflation of allocating a Batch.

  * Execution of batched Runnables for synchronous BatchingExecutors
    has now been slightly improved due to reducing reads.

  * Submitting Runnables to BatchingExectuor now null-checks all
    Runnables, before would have cases where checks were not performed.

  * Improves performance of completeWith by avoiding to allocate the
    function which would attempt to do the actual completion.
  * Splits the BatchingExecutor into Sync and Async Batches
  * Inlining logic in Promise.Transformation, and switching to Int
    for _xform ordinal
  * Allows for max 16 Runnables to execute nested on stack
  * Allows for max 1024 Runnables to be executed before resubmit
  * Makes synchronous BatchingExecutors also use submitForExecution
    since this makes things easier to instrument.
  * Complete with regression test verifying that they all throw
    NotSerializableException
@viktorklang viktorklang force-pushed the wip-additional-perf-futures-√ branch from 909d125 to 50c6f94 Compare February 14, 2019 20:20
@viktorklang
Copy link
Contributor Author

@SethTisue Rebased and pushed!

@SethTisue
Copy link
Member

SethTisue commented Feb 14, 2019

go, run 1791! do the best you can! https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1791/

events in France in 1791 included the Day of Daggers, the Champ de Mars Massacre, and the Massacres of La Glacière, so hopefully this goes at least that well

@tarsa
Copy link

tarsa commented Feb 15, 2019

I've benchmarked and compared (eyeballed) results for Further improves performance of Futures via BatchingExecutor Viktor Klang 06.02.19 19:21 c83124acb5cbd2d37cf35ecfb700f344f320fe7e and Makes Promise.Transformation's special fields not serialized Viktor Klang 11.02.19 15:01 5cf0d1626725391754bbb0d2480a0b5e0b1d7985
"fie" indeed gained strongly, while for other executors results were mixed (not sure if better or worse). Aggregate scores for each executor would make comparison easier.

@viktorklang
Copy link
Contributor Author

@tarsa Others than fie would not benefit from the changes I made to sync execution. But the changes made to fairness could impact performance slightly for the worse, but overall I think it's a good tradeoff to make sure that there is eventual progress for other tasks on the same pool.

@viktorklang
Copy link
Contributor Author

@SethTisue I'm unsure as how to parse/interpret the results of that community build. How do I know what is related to my changes?

@SethTisue
Copy link
Member

SethTisue commented Feb 15, 2019

@SethTisue I'm unsure as how to parse/interpret the results of that community build. How do I know what is related to my changes?

we have to compare it to another recent run such as https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1789/ , which shows the same failures. no failures appear attributable to this PR

I intend to address those failures soon. we needn't hold up this PR over it. there will be many further community build runs between now and 2.13.0 final

@SethTisue SethTisue added the release-notes worth highlighting in next release notes label Feb 15, 2019
@SethTisue SethTisue merged commit 1775dba into scala:2.13.x Feb 15, 2019
@NthPortal
Copy link
Contributor

@viktorklang sorry I never got around to fully reviewing this - what I did get to looked great though!

@viktorklang
Copy link
Contributor Author

viktorklang commented Feb 16, 2019 via email

@diesalbla diesalbla added the library:concurrent Changes to the concurrency support in stdlib label Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

library:concurrent Changes to the concurrency support in stdlib release-notes worth highlighting in next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.