|
41 | 41 | import org.apache.spark.sql.streaming.StateOperatorProgress; |
42 | 42 | import org.apache.spark.sql.streaming.StreamingQueryListener; |
43 | 43 | import org.apache.spark.sql.streaming.StreamingQueryProgress; |
| 44 | +import org.slf4j.Logger; |
| 45 | +import org.slf4j.LoggerFactory; |
44 | 46 | import scala.Tuple2; |
45 | 47 | import scala.collection.JavaConverters; |
46 | 48 |
|
|
53 | 55 | * still needed |
54 | 56 | */ |
55 | 57 | public abstract class AbstractDatadogSparkListener extends SparkListener { |
| 58 | + private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class); |
56 | 59 | public static volatile AbstractDatadogSparkListener listener = null; |
57 | 60 | public static volatile boolean finishTraceOnApplicationEnd = true; |
58 | 61 | public static volatile boolean isPysparkShell = false; |
@@ -121,6 +124,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp |
121 | 124 | databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null); |
122 | 125 | databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName); |
123 | 126 | sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks); |
| 127 | + |
| 128 | + log.info("Created datadog spark listener: {}", this.getClass().getSimpleName()); |
124 | 129 | } |
125 | 130 |
|
126 | 131 | /** Resource name of the spark job. Provide an implementation based on a specific scala version */ |
@@ -173,13 +178,19 @@ private void initApplicationSpanIfNotInitialized() { |
173 | 178 |
|
174 | 179 | @Override |
175 | 180 | public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { |
| 181 | + log.info( |
| 182 | + "Received spark application end event, finish trace on this event: {}", |
| 183 | + finishTraceOnApplicationEnd); |
| 184 | + |
176 | 185 | if (finishTraceOnApplicationEnd) { |
177 | 186 | finishApplication(applicationEnd.time(), null, 0, null); |
178 | 187 | } |
179 | 188 | } |
180 | 189 |
|
181 | 190 | public synchronized void finishApplication( |
182 | 191 | long time, Throwable throwable, int exitCode, String msg) { |
| 192 | + log.info("Finishing spark application trace"); |
| 193 | + |
183 | 194 | if (applicationEnded) { |
184 | 195 | return; |
185 | 196 | } |
|
0 commit comments