Skip to content

[Spark] Instrument YARN application end to not drop traces#5267

Merged
paul-laffon-dd merged 3 commits intomasterfrom
paul.laffon/spark-yarn-finish
May 31, 2023
Merged

[Spark] Instrument YARN application end to not drop traces#5267
paul-laffon-dd merged 3 commits intomasterfrom
paul.laffon/spark-yarn-finish

Conversation

@paul-laffon-dd
Copy link
Copy Markdown
Contributor

@paul-laffon-dd paul-laffon-dd commented May 25, 2023

What Does This Do

Instrument the spark yarn.ApplicationMaster.finish() function in order to

  • catch the exit code and exit message of the application
  • make sure the spark.application span is finished before the trace processor is exited

Capturing exit code

An application can fail during spark computations (when performing operations on the data) or in the user code (like trying to select a column that does not exists)

Failures during spark computations are caught by the SparkListener, but not the errors in the user code so instrumenting the finish function allows to also capture those

Finish the spark.application span

The spark.application span is the local root span and is finished when the event onApplicationEnd is received. However if the sparkContext is not stopped explicitly by the user (using spark.stop()), spark is stopped in a shutdown hook. In this case, the onApplicationEnd event can be received after the trace processor was exited, leading to the trace being dropped, with the following logs

23/05/17 13:11:23 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
23/05/17 13:11:23 INFO SparkContext: Invoking stop() from shutdown hook
[dd.trace 2023-05-17 13:11:23:205 +0000] [dd-trace-processor] DEBUG datadog.trace.agent.common.writer.TraceProcessingWorker - Datadog trace processor exited. Publishing traces stopped
[dd.trace 2023-05-17 13:11:23:212 +0000] [spark-listener-group-shared] DEBUG datadog.trace.agent.common.writer.RemoteWriter - Trace written after shutdown.. Counted but dropping trace

By manually calling the function behind onApplicationEnd, we can make sure that the trace is always finished before the tracer is exited

@paul-laffon-dd paul-laffon-dd force-pushed the paul.laffon/spark-yarn-finish branch 2 times, most recently from 3004bfe to 556c6b0 Compare May 25, 2023 12:20
@pr-commenter
Copy link
Copy Markdown

pr-commenter Bot commented May 25, 2023

Benchmarks

Parameters

Baseline Candidate
commit 1.15.0-SNAPSHOT~eeb83b84d5 1.15.0-SNAPSHOT~a2467b991c
config baseline candidate
See matching parameters
Baseline Candidate
module Agent Agent
parent None None

Summary

Found 0 performance improvements and 0 performance regressions! Performance is the same for 22 cases.

See unchanged results
scenario Δ mean execution_time
scenario:Startup-base-Agent same
scenario:Startup-base-Agent.start same
scenario:Startup-base-BytebuddyAgent same
scenario:Startup-base-GlobalTracer same
scenario:Startup-base-AppSec same
scenario:Startup-base-Remote Config same
scenario:Startup-base-Telemetry same
scenario:Startup-iast-Agent same
scenario:Startup-iast-Agent.start same
scenario:Startup-iast-BytebuddyAgent same
scenario:Startup-iast-GlobalTracer same
scenario:Startup-iast-AppSec same
scenario:Startup-iast-IAST same
scenario:Startup-iast-Remote Config same
scenario:Startup-iast-Telemetry same
scenario:Startup-waf-Agent same
scenario:Startup-waf-Agent.start same
scenario:Startup-waf-BytebuddyAgent same
scenario:Startup-waf-GlobalTracer same
scenario:Startup-waf-AppSec same
scenario:Startup-waf-Remote Config same
scenario:Startup-waf-Telemetry same

@paul-laffon-dd paul-laffon-dd force-pushed the paul.laffon/spark-yarn-finish branch from 556c6b0 to 597a83a Compare May 25, 2023 13:15
@paul-laffon-dd paul-laffon-dd marked this pull request as ready for review May 25, 2023 13:57
@paul-laffon-dd paul-laffon-dd requested a review from a team as a code owner May 25, 2023 13:57
@paul-laffon-dd paul-laffon-dd changed the title [Spark] Handle spark on YARN application end in a special way [Spark] Instrument YARN application end to not drop traces May 25, 2023
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is YarnFinishAdvice run by a separate thread then the SparkListener? If so finishApplication needs to be thread safe

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will run in the main thread of the application while the sparkListener will run on another thread so this method has to be thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods are now synchronized that could lead to performance issue.
What is the thing you are protecting against in the first place?
If only finishApplication() can be called from another thread, it might worth it to handle the thread safety at this level only, like using volatile or atomic boolean for applicationEnded. AgentSpan instance should be fine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, only the finishApplication method can be called from another thread, but it is referencing multiple variables (applicationEnded, lastJobFailed, liveExecutors, applicationMetrics) from the class so synchronized abstract the complexity of making each variable tread-safe

Knowing that all methods are called from the same thread, except the finishApplication() that can be called once per spark application from another thread, the performance impact of synchronized should be minimal right ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biased locking (which I think you are alluding to) have been removed from newer versions of the JDK, since the maintenance overhead outweighed the performance gains. That being said, from what I gather, these synchronized blocks are not in a performance critical path, so I think it should be fine for the case of simplicity.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think this will do what you expect:

  • We do already flush the traces in the shutdown hook
  • You don’t know if your hook will run after or before the tracer one. So it might be already dead when you call sleep.

Moreover, please avoid using thread.sleep(). It should be added to forbidden API by the way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the sleep since as you said it was not needed.

The issue with the dropped trace was because the onApplicationEnd could be triggered after the processor was exited, with the log Datadog trace processor exited. Publishing traces stopped.

By manually triggering the event before the finish function, we can already be sure that the trace is written before the tracer is exited

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the sleep since as you said it was not needed.

I did not say it was not needed, rather than it might not do what you expect.
Hence, it implies we would like to have some tests to back the feature you introduce.
It would ensure it works properly and we can maintain it in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this change is that it involves interactions between Spark and YARN, which makes is very complicated to implement in tests

In the current tests, spark is created with config("spark.master", "local") so that all spark components are launched in the same JVM. In order to test this change, it would have to be launched with config("spark.master", "yarn"), where spark would expect to connect to a YARN cluster https://spark.apache.org/docs/latest/running-on-yarn.html#launching-spark-on-yarn

To validate this change, I manually launched multiple SparkPi application on an EMR cluster with a build of this branch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @PerfectSlayer that it would be good to at least have one test. I know that orchestrating a spark test with yarn is out of the question, but a test that checks that the method gets instrumented and the tags added could be possible anyway. I quickly looked at the ApplicationMaster and I think it would be possible, even if it's a horrible hack, to force use it without having it orchestrate the whole spark application. Basically just create a SparkSession and then create a ApplicationMaster and call finish(...) on it. The spark/yarn conf supplied to it would need to be faked to make sure it doesn't try to do anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, an ApplicationMaster can be instantiated without orchestrating the whole spark application (and it is definitely easier than I would have expected)

I added a test calling the finish(...) to end the application span

@PerfectSlayer PerfectSlayer added inst: others All other instrumentations inst: apache spark Apache Spark instrumentation labels May 25, 2023
@paul-laffon-dd paul-laffon-dd force-pushed the paul.laffon/spark-yarn-finish branch from 33e8587 to 37fdecc Compare May 26, 2023 08:38
@paul-laffon-dd paul-laffon-dd force-pushed the paul.laffon/spark-yarn-finish branch from 37fdecc to 07d63db Compare May 30, 2023 11:44
@paul-laffon-dd paul-laffon-dd force-pushed the paul.laffon/spark-yarn-finish branch from eebbc8e to a2467b9 Compare May 30, 2023 16:30
@paul-laffon-dd paul-laffon-dd requested a review from bantonsson May 31, 2023 08:09
@bantonsson bantonsson dismissed PerfectSlayer’s stale review May 31, 2023 09:24

A test has been added

@paul-laffon-dd paul-laffon-dd merged commit 4dd9306 into master May 31, 2023
@paul-laffon-dd paul-laffon-dd deleted the paul.laffon/spark-yarn-finish branch May 31, 2023 09:29
@github-actions github-actions Bot added this to the 1.15.0 milestone May 31, 2023
paul-laffon-dd added a commit that referenced this pull request Jul 3, 2023
…ener interface (#5505)

Instrument the SparkSubmit.runMain(...) function to capture all errors occurring in a spark application

This SparkSubmit class is used by spark to launch the application in non YARN/Mesos environnements. The YARN case is already covered by instrumenting the ApplicationMaster.finish() function (from this PR #5267)

## Motivation

We are currently using the interface SparkListener to receive events from spark. However, this interface is only capturing errors that occur during the spark computations, but not in arbitrary user code

```
def main() = {
  val spark = SparkSession.builder.getOrCreate()
  val df = spark.read.parquet("...")
  
  // Exception will be captured because it's happening during spark computations
  df.map(x => new RuntimeException("Some exception"))

  // Exception currently not captured because it is thrown outside of spark computations
  throw new RuntimeException("Some exception")
}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

inst: apache spark Apache Spark instrumentation inst: others All other instrumentations

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants