Skip to content

Commit 67ad13b

Browse files
authored
Add telemetry app product change message (#7348)
What Does This Do Add new app_product_change request type Update request boy writeProducts method to be able to build the json making each product optional as the schema defines Add new Product change telemetry event Add more tests Motivation ASM enablement (appsec.enabled) in telemetry product action should reflect remote config changes. Digging into this we noticed that the app_product_change telemetry request type is not implemented in java Additional Notes Only implemented to detect appsec product changes
1 parent 23ce5ef commit 67ad13b

19 files changed

Lines changed: 377 additions & 28 deletions

File tree

dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import datadog.trace.api.Config;
1919
import datadog.trace.api.ProductActivation;
2020
import datadog.trace.api.gateway.SubscriptionService;
21+
import datadog.trace.api.telemetry.ProductChange;
22+
import datadog.trace.api.telemetry.ProductChangeCollector;
2123
import datadog.trace.bootstrap.ActiveSubsystems;
2224
import datadog.trace.util.Strings;
2325
import java.util.Collections;
@@ -112,6 +114,10 @@ public static boolean isActive() {
112114

113115
public static void setActive(boolean status) {
114116
ActiveSubsystems.APPSEC_ACTIVE = status;
117+
// Report to the product change via telemetry
118+
log.debug("AppSec is now {}", status ? "active" : "inactive");
119+
ProductChangeCollector.get()
120+
.update(new ProductChange().productType(ProductChange.ProductType.APPSEC).enabled(status));
115121
}
116122

117123
public static void stop() {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.api.telemetry;
2+
3+
public class ProductChange {
4+
5+
private ProductType productType;
6+
private boolean enabled;
7+
8+
public ProductType getProductType() {
9+
return productType;
10+
}
11+
12+
public boolean isEnabled() {
13+
return enabled;
14+
}
15+
16+
public ProductChange productType(ProductType productType) {
17+
this.productType = productType;
18+
return this;
19+
}
20+
21+
public ProductChange enabled(boolean enabled) {
22+
this.enabled = enabled;
23+
return this;
24+
}
25+
26+
public enum ProductType {
27+
APPSEC("appsec"),
28+
PROFILER("profiler"),
29+
DYNAMIC_INSTRUMENTATION("dynamic_instrumentation");
30+
31+
private final String name;
32+
33+
ProductType(String name) {
34+
this.name = name;
35+
}
36+
37+
public String getName() {
38+
return name;
39+
}
40+
}
41+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package datadog.trace.api.telemetry;
2+
3+
import java.util.Collections;
4+
import java.util.LinkedList;
5+
import java.util.List;
6+
import java.util.Queue;
7+
import java.util.concurrent.LinkedBlockingQueue;
8+
9+
public class ProductChangeCollector {
10+
11+
private static final ProductChangeCollector INSTANCE = new ProductChangeCollector();
12+
private final Queue<ProductChange> productChanges = new LinkedBlockingQueue<>();
13+
14+
private ProductChangeCollector() {}
15+
16+
public static ProductChangeCollector get() {
17+
return INSTANCE;
18+
}
19+
20+
public synchronized void update(final ProductChange productChange) {
21+
productChanges.offer(productChange);
22+
}
23+
24+
public synchronized List<ProductChange> drain() {
25+
if (productChanges.isEmpty()) {
26+
return Collections.emptyList();
27+
}
28+
29+
List<ProductChange> list = new LinkedList<>();
30+
31+
ProductChange productChange;
32+
while ((productChange = productChanges.poll()) != null) {
33+
list.add(productChange);
34+
}
35+
36+
return list;
37+
}
38+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package datadog.trace.api.telemetry
2+
3+
import datadog.trace.test.util.DDSpecification
4+
5+
import static datadog.trace.api.telemetry.ProductChange.ProductType.APPSEC
6+
import static datadog.trace.api.telemetry.ProductChange.ProductType.PROFILER
7+
8+
class ProductChangeCollectorTest extends DDSpecification {
9+
10+
def "update-drain product changes"() {
11+
setup:
12+
def product1 = new ProductChange().productType(APPSEC).enabled(true)
13+
def product2 = new ProductChange().productType(PROFILER).enabled(false)
14+
def product3 = new ProductChange().productType(APPSEC).enabled(false)
15+
16+
ProductChangeCollector.get().productChanges.offer(product1)
17+
ProductChangeCollector.get().productChanges.offer(product2)
18+
19+
when:
20+
ProductChangeCollector.get().update(product3)
21+
22+
then:
23+
ProductChangeCollector.get().drain() == [product1, product2, product3]
24+
ProductChangeCollector.get().drain() == []
25+
}
26+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package datadog.trace.api.telemetry
2+
3+
import datadog.trace.test.util.DDSpecification
4+
5+
import static datadog.trace.api.telemetry.ProductChange.ProductType.APPSEC
6+
7+
class ProductChangeTest extends DDSpecification {
8+
9+
def "Test ProductChange"() {
10+
when:
11+
def productChange = new ProductChange().productType(APPSEC).enabled(true)
12+
13+
then:
14+
productChange.getProductType() == APPSEC
15+
productChange.isEnabled()
16+
}
17+
}

telemetry/src/main/java/datadog/telemetry/BufferedEvents.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.telemetry.api.Metric;
77
import datadog.telemetry.dependency.Dependency;
88
import datadog.trace.api.ConfigSetting;
9+
import datadog.trace.api.telemetry.ProductChange;
910
import java.util.ArrayList;
1011

1112
/**
@@ -26,6 +27,8 @@ public final class BufferedEvents implements EventSource, EventSink {
2627
private int distributionSeriesIndex;
2728
private ArrayList<LogMessage> logMessageEvents;
2829
private int logMessageIndex;
30+
private ArrayList<ProductChange> productChangeEvents;
31+
private int productChangeIndex;
2932

3033
public void addConfigChangeEvent(ConfigSetting event) {
3134
if (configChangeEvents == null) {
@@ -74,6 +77,14 @@ public void addLogMessageEvent(LogMessage event) {
7477
logMessageEvents.add(event);
7578
}
7679

80+
@Override
81+
public void addProductChangeEvent(ProductChange event) {
82+
if (productChangeEvents == null) {
83+
productChangeEvents = new ArrayList<>(INITIAL_CAPACITY);
84+
}
85+
productChangeEvents.add(event);
86+
}
87+
7788
@Override
7889
public boolean hasConfigChangeEvent() {
7990
return configChangeEvents != null && configChangeIndex < configChangeEvents.size();
@@ -134,4 +145,14 @@ public boolean hasLogMessageEvent() {
134145
public LogMessage nextLogMessageEvent() {
135146
return logMessageEvents.get(logMessageIndex++);
136147
}
148+
149+
@Override
150+
public boolean hasProductChangeEvent() {
151+
return productChangeEvents != null && productChangeIndex < productChangeEvents.size();
152+
}
153+
154+
@Override
155+
public ProductChange nextProductChangeEvent() {
156+
return productChangeEvents.get(productChangeIndex++);
157+
}
137158
}

telemetry/src/main/java/datadog/telemetry/EventSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.telemetry.api.Metric;
77
import datadog.telemetry.dependency.Dependency;
88
import datadog.trace.api.ConfigSetting;
9+
import datadog.trace.api.telemetry.ProductChange;
910

1011
/**
1112
* A unified interface for telemetry event sink. It is used to buffer events polled from the queues
@@ -25,6 +26,8 @@ interface EventSink {
2526

2627
void addLogMessageEvent(LogMessage event);
2728

29+
void addProductChangeEvent(ProductChange event);
30+
2831
EventSink NOOP = new Noop();
2932

3033
class Noop implements EventSink {
@@ -47,5 +50,8 @@ public void addDistributionSeriesEvent(DistributionSeries event) {}
4750

4851
@Override
4952
public void addLogMessageEvent(LogMessage event) {}
53+
54+
@Override
55+
public void addProductChangeEvent(ProductChange event) {}
5056
}
5157
}

telemetry/src/main/java/datadog/telemetry/EventSource.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.telemetry.api.Metric;
77
import datadog.telemetry.dependency.Dependency;
88
import datadog.trace.api.ConfigSetting;
9+
import datadog.trace.api.telemetry.ProductChange;
910
import java.util.Queue;
1011

1112
/**
@@ -38,6 +39,10 @@ interface EventSource {
3839

3940
LogMessage nextLogMessageEvent();
4041

42+
boolean hasProductChangeEvent();
43+
44+
ProductChange nextProductChangeEvent();
45+
4146
default boolean isEmpty() {
4247
return !hasConfigChangeEvent()
4348
&& !hasIntegrationEvent()
@@ -54,20 +59,23 @@ final class Queued implements EventSource {
5459
private final Queue<Metric> metricQueue;
5560
private final Queue<DistributionSeries> distributionSeriesQueue;
5661
private final Queue<LogMessage> logMessageQueue;
62+
private final Queue<ProductChange> productChanges;
5763

5864
Queued(
5965
Queue<ConfigSetting> configChangeQueue,
6066
Queue<Integration> integrationQueue,
6167
Queue<Dependency> dependencyQueue,
6268
Queue<Metric> metricQueue,
6369
Queue<DistributionSeries> distributionSeriesQueue,
64-
Queue<LogMessage> logMessageQueue) {
70+
Queue<LogMessage> logMessageQueue,
71+
Queue<ProductChange> productChanges) {
6572
this.configChangeQueue = configChangeQueue;
6673
this.integrationQueue = integrationQueue;
6774
this.dependencyQueue = dependencyQueue;
6875
this.metricQueue = metricQueue;
6976
this.distributionSeriesQueue = distributionSeriesQueue;
7077
this.logMessageQueue = logMessageQueue;
78+
this.productChanges = productChanges;
7179
}
7280

7381
@Override
@@ -129,5 +137,15 @@ public boolean hasLogMessageEvent() {
129137
public LogMessage nextLogMessageEvent() {
130138
return logMessageQueue.poll();
131139
}
140+
141+
@Override
142+
public boolean hasProductChangeEvent() {
143+
return !productChanges.isEmpty();
144+
}
145+
146+
@Override
147+
public ProductChange nextProductChangeEvent() {
148+
return productChanges.poll();
149+
}
132150
}
133151
}

telemetry/src/main/java/datadog/telemetry/ExtendedHeartbeatData.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.telemetry.api.Metric;
77
import datadog.telemetry.dependency.Dependency;
88
import datadog.trace.api.ConfigSetting;
9+
import datadog.trace.api.telemetry.ProductChange;
910
import java.util.ArrayList;
1011

1112
public class ExtendedHeartbeatData {
@@ -110,5 +111,15 @@ public boolean hasLogMessageEvent() {
110111
public LogMessage nextLogMessageEvent() {
111112
return null;
112113
}
114+
115+
@Override
116+
public boolean hasProductChangeEvent() {
117+
return false;
118+
}
119+
120+
@Override
121+
public ProductChange nextProductChangeEvent() {
122+
return null;
123+
}
113124
}
114125
}

telemetry/src/main/java/datadog/telemetry/TelemetryRequest.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,20 @@
1313
import datadog.trace.api.DDTags;
1414
import datadog.trace.api.InstrumenterConfig;
1515
import datadog.trace.api.ProductActivation;
16+
import datadog.trace.api.telemetry.ProductChange;
17+
import datadog.trace.api.telemetry.ProductChange.ProductType;
1618
import java.io.IOException;
19+
import java.util.EnumMap;
20+
import java.util.Map;
1721
import okhttp3.MediaType;
1822
import okhttp3.Request;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
1925

2026
public class TelemetryRequest {
27+
28+
private static final Logger log = LoggerFactory.getLogger(TelemetryRequest.class);
29+
2130
static final String API_VERSION = "v2";
2231
static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
2332

@@ -90,14 +99,11 @@ public void writeConfigurations() {
9099
}
91100

92101
public void writeProducts() {
93-
InstrumenterConfig instrumenterConfig = InstrumenterConfig.get();
94-
Config config = Config.get();
95102
try {
96-
boolean appsecEnabled =
97-
instrumenterConfig.getAppSecActivation() != ProductActivation.FULLY_DISABLED;
98-
boolean profilerEnabled = instrumenterConfig.isProfilingEnabled();
99-
boolean dynamicInstrumentationEnabled = config.isDebuggerEnabled();
100-
requestBody.writeProducts(appsecEnabled, profilerEnabled, dynamicInstrumentationEnabled);
103+
requestBody.writeProducts(
104+
InstrumenterConfig.get().getAppSecActivation() != ProductActivation.FULLY_DISABLED,
105+
InstrumenterConfig.get().isProfilingEnabled(),
106+
Config.get().isDebuggerEnabled());
101107
} catch (IOException e) {
102108
throw new TelemetryRequestBody.SerializationException("products", e);
103109
}
@@ -200,6 +206,26 @@ public void writeLogs() {
200206
}
201207
}
202208

209+
public void writeChangedProducts() {
210+
if (!isWithinSizeLimits() || !eventSource.hasProductChangeEvent()) {
211+
return;
212+
}
213+
try {
214+
log.debug("Writing changed products");
215+
requestBody.beginProducts();
216+
Map<ProductType, Boolean> products = new EnumMap<>(ProductType.class);
217+
while (eventSource.hasProductChangeEvent() && isWithinSizeLimits()) {
218+
ProductChange event = eventSource.nextProductChangeEvent();
219+
products.put(event.getProductType(), event.isEnabled());
220+
eventSink.addProductChangeEvent(event);
221+
}
222+
requestBody.writeProducts(products);
223+
requestBody.endProducts();
224+
} catch (IOException e) {
225+
throw new TelemetryRequestBody.SerializationException("changed-products", e);
226+
}
227+
}
228+
203229
public void writeHeartbeat() {
204230
requestBody.writeHeartbeatEvent();
205231
}

0 commit comments

Comments
 (0)