Skip to content

Commit 0bfaa06

Browse files
committed
Centralize logic in a common AbstractSparkPlanUtils class
1 parent e82dd20 commit 0bfaa06

5 files changed

Lines changed: 93 additions & 89 deletions

File tree

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public String[] helperClassNames() {
2424
return new String[] {
2525
packageName + ".AbstractDatadogSparkListener",
2626
packageName + ".AbstractSparkPlanSerializer",
27+
packageName + ".AbstractSparkPlanUtils",
2728
packageName + ".DatabricksParentContext",
2829
packageName + ".OpenlineageParentContext",
2930
packageName + ".DatadogSpark212Listener",
@@ -109,7 +110,8 @@ public static void exit(
109110
JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan))
110111
.toMap(Predef.$conforms());
111112

112-
SparkPlanInfo newPlanInfo = Spark212PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta);
113+
SparkPlanInfo newPlanInfo =
114+
new Spark212PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta);
113115
if (newPlanInfo != null) {
114116
planInfo = newPlanInfo;
115117
}
Lines changed: 15 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
11
package datadog.trace.instrumentation.spark;
22

3-
import datadog.trace.util.MethodHandles;
43
import java.lang.invoke.MethodHandle;
5-
import java.util.ArrayList;
6-
import java.util.Arrays;
7-
import java.util.List;
84
import org.apache.spark.sql.execution.SparkPlanInfo;
9-
import org.slf4j.Logger;
10-
import org.slf4j.LoggerFactory;
115
import scala.Option;
6+
import scala.collection.immutable.Map;
127

13-
public class Spark212PlanUtils {
14-
private static final Logger log = LoggerFactory.getLogger(Spark212PlanUtils.class);
15-
16-
private static final MethodHandles methodLoader =
17-
new MethodHandles(ClassLoader.getSystemClassLoader());
8+
public class Spark212PlanUtils extends AbstractSparkPlanUtils {
189
private static final MethodHandle constructor =
1910
methodLoader.constructor(
2011
SparkPlanInfo.class,
@@ -35,43 +26,20 @@ public class Spark212PlanUtils {
3526
String.class,
3627
Option.class);
3728

38-
public static SparkPlanInfo upsertSparkPlanInfoMetadata(
39-
SparkPlanInfo planInfo, scala.collection.immutable.Map<String, String> meta) {
40-
// Attempt to create a new SparkPlanInfo with additional metadata replaced
41-
// Since the fields are immutable we must instantiate a new SparkPlanInfo to do this
42-
43-
Object[] standardArgs =
44-
new Object[] {
45-
planInfo.nodeName(),
46-
planInfo.simpleString(),
47-
planInfo.children(),
48-
meta,
49-
planInfo.metrics()
50-
};
51-
52-
if (databricksConstructor != null) {
53-
List<Object> databricksArgs = new ArrayList<>(Arrays.asList(standardArgs));
54-
try {
55-
databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo));
56-
databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo));
57-
databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo));
58-
} catch (Throwable t) {
59-
log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t);
60-
}
61-
62-
SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray());
63-
if (newPlan != null) {
64-
return newPlan;
65-
}
66-
}
29+
@Override
30+
protected MethodHandle getConstructor() {
31+
return constructor;
32+
}
6733

68-
if (constructor != null) {
69-
SparkPlanInfo newPlan = methodLoader.invoke(constructor, standardArgs);
70-
if (newPlan != null) {
71-
return newPlan;
72-
}
73-
}
34+
@Override
35+
protected MethodHandle getDatabricksConstructor() {
36+
return databricksConstructor;
37+
}
7438

75-
return null;
39+
@Override
40+
protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) {
41+
return new Object[] {
42+
planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics()
43+
};
7644
}
7745
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public String[] helperClassNames() {
2424
return new String[] {
2525
packageName + ".AbstractDatadogSparkListener",
2626
packageName + ".AbstractSparkPlanSerializer",
27+
packageName + ".AbstractSparkPlanUtils",
2728
packageName + ".DatabricksParentContext",
2829
packageName + ".OpenlineageParentContext",
2930
packageName + ".DatadogSpark213Listener",
@@ -109,7 +110,8 @@ public static void exit(
109110
Map<String, String> meta =
110111
HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan)));
111112

112-
SparkPlanInfo newPlanInfo = Spark213PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta);
113+
SparkPlanInfo newPlanInfo =
114+
new Spark213PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta);
113115
if (newPlanInfo != null) {
114116
planInfo = newPlanInfo;
115117
}
Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
11
package datadog.trace.instrumentation.spark;
22

3-
import datadog.trace.util.MethodHandles;
43
import java.lang.invoke.MethodHandle;
5-
import java.util.ArrayList;
6-
import java.util.Arrays;
7-
import java.util.List;
84
import org.apache.spark.sql.execution.SparkPlanInfo;
9-
import org.slf4j.Logger;
10-
import org.slf4j.LoggerFactory;
115
import scala.Option;
6+
import scala.collection.immutable.Map;
127

13-
public class Spark213PlanUtils {
14-
private static final Logger log = LoggerFactory.getLogger(Spark213PlanUtils.class);
15-
16-
private static final MethodHandles methodLoader =
17-
new MethodHandles(ClassLoader.getSystemClassLoader());
8+
public class Spark213PlanUtils extends AbstractSparkPlanUtils {
189
private static final MethodHandle constructor =
1910
methodLoader.constructor(
2011
SparkPlanInfo.class,
@@ -35,36 +26,20 @@ public class Spark213PlanUtils {
3526
String.class,
3627
Option.class);
3728

38-
public static SparkPlanInfo upsertSparkPlanInfoMetadata(
39-
SparkPlanInfo planInfo, scala.collection.immutable.Map<String, String> meta) {
40-
// Attempt to create a new SparkPlanInfo with additional metadata replaced
41-
// Since the fields are immutable we must instantiate a new SparkPlanInfo to do this
42-
43-
Object[] standardArgs =
44-
new Object[] {
45-
planInfo.nodeName(),
46-
planInfo.simpleString(),
47-
planInfo.children(),
48-
meta,
49-
planInfo.metrics()
50-
};
51-
52-
if (databricksConstructor != null) {
53-
List<Object> databricksArgs = new ArrayList<>(Arrays.asList(standardArgs));
54-
try {
55-
databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo));
56-
databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo));
57-
databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo));
58-
} catch (Throwable t) {
59-
log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t);
60-
}
29+
@Override
30+
protected MethodHandle getConstructor() {
31+
return constructor;
32+
}
6133

62-
SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray());
63-
if (newPlan != null) {
64-
return newPlan;
65-
}
66-
}
34+
@Override
35+
protected MethodHandle getDatabricksConstructor() {
36+
return databricksConstructor;
37+
}
6738

68-
return null;
39+
@Override
40+
protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) {
41+
return new Object[] {
42+
planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics()
43+
};
6944
}
7045
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.util.MethodHandles;
4+
import java.lang.invoke.MethodHandle;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import org.apache.spark.sql.execution.SparkPlanInfo;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import scala.collection.immutable.Map;
12+
13+
abstract class AbstractSparkPlanUtils {
14+
private static final Logger log = LoggerFactory.getLogger(AbstractSparkPlanUtils.class);
15+
16+
protected static final MethodHandles methodLoader =
17+
new MethodHandles(ClassLoader.getSystemClassLoader());
18+
19+
protected abstract MethodHandle getConstructor();
20+
21+
protected abstract MethodHandle getDatabricksConstructor();
22+
23+
// Deals with Seq which changed from Scala 2.12 to 2.13, so delegate to version-specific classes
24+
protected abstract Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta);
25+
26+
// Attempt to create a new SparkPlanInfo with additional metadata replaced
27+
// Since the fields are immutable we must instantiate a new SparkPlanInfo to do this
28+
public SparkPlanInfo upsertSparkPlanInfoMetadata(
29+
SparkPlanInfo planInfo, scala.collection.immutable.Map<String, String> meta) {
30+
if (getDatabricksConstructor() != null) {
31+
List<Object> databricksArgs = new ArrayList<>(Arrays.asList(getStandardArgs(planInfo, meta)));
32+
try {
33+
databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo));
34+
databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo));
35+
databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo));
36+
} catch (Throwable t) {
37+
log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t);
38+
}
39+
40+
SparkPlanInfo newPlan =
41+
methodLoader.invoke(getDatabricksConstructor(), databricksArgs.toArray());
42+
if (newPlan != null) {
43+
return newPlan;
44+
}
45+
}
46+
47+
if (getConstructor() != null) {
48+
SparkPlanInfo newPlan =
49+
methodLoader.invoke(getConstructor(), getStandardArgs(planInfo, meta));
50+
if (newPlan != null) {
51+
return newPlan;
52+
}
53+
}
54+
55+
return null;
56+
}
57+
}

0 commit comments

Comments
 (0)