Skip to content

Commit fdf8c7f

Browse files
committed
Fix KafkaStreamsTest
1 parent 9c333ab commit fdf8c7f

1 file changed

Lines changed: 25 additions & 9 deletions

File tree

dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import spock.lang.Shared
2727

2828
import java.util.concurrent.LinkedBlockingQueue
2929
import java.util.concurrent.TimeUnit
30+
import java.util.regex.Pattern
3031

3132
@Flaky("https://github.com/DataDog/dd-trace-java/issues/3865")
3233
class KafkaStreamsTest extends AgentTestRunner {
@@ -39,6 +40,11 @@ class KafkaStreamsTest extends AgentTestRunner {
3940
@Shared
4041
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka
4142

43+
def setup() {
44+
// Filter out additional traces for kafka.poll operation, otherwise, there will be more traces than expected.
45+
TEST_WRITER.setFilter { trace -> trace[0].operationName.toString() != 'kafka.poll' }
46+
}
47+
4248
@Override
4349
protected boolean isDataStreamsEnabled() {
4450
return true
@@ -117,8 +123,7 @@ class KafkaStreamsTest extends AgentTestRunner {
117123
received.value() == greeting.toLowerCase()
118124
received.key() == null
119125

120-
// We set ignoreAdditionalTraces to true, since there are additional traces for kafka.poll operation.
121-
assertTraces(3, true) {
126+
assertTraces(3) {
122127
trace(1) {
123128
// PRODUCER span 0
124129
span {
@@ -136,6 +141,7 @@ class KafkaStreamsTest extends AgentTestRunner {
136141
if ({ isDataStreamsEnabled()}) {
137142
"$DDTags.PATHWAY_HASH" { String }
138143
}
144+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
139145
defaultTagsNoPeerService()
140146
}
141147
}
@@ -160,7 +166,6 @@ class KafkaStreamsTest extends AgentTestRunner {
160166
"$InstrumentationTags.PARTITION" { it >= 0 }
161167
"$InstrumentationTags.OFFSET" 0
162168
"$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000"
163-
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
164169
"asdf" "testing"
165170
if ({ isDataStreamsEnabled()}) {
166171
"$DDTags.PATHWAY_HASH" { String }
@@ -187,6 +192,7 @@ class KafkaStreamsTest extends AgentTestRunner {
187192
if ({ isDataStreamsEnabled()}) {
188193
"$DDTags.PATHWAY_HASH" { String }
189194
}
195+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
190196
defaultTagsNoPeerService()
191197
}
192198
}
@@ -213,6 +219,7 @@ class KafkaStreamsTest extends AgentTestRunner {
213219
if ({ isDataStreamsEnabled()}) {
214220
"$DDTags.PATHWAY_HASH" { String }
215221
}
222+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+")
216223
defaultTags(true)
217224
}
218225
}
@@ -227,8 +234,11 @@ class KafkaStreamsTest extends AgentTestRunner {
227234
if (isDataStreamsEnabled()) {
228235
StatsGroup originProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
229236
verifyAll(originProducerPoint) {
230-
edgeTags == ["direction:out", "topic:$STREAM_PENDING", "type:kafka"]
231-
edgeTags.size() == 3
237+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
238+
for (String tag : ["direction:out", "topic:$STREAM_PENDING", "type:kafka"]) {
239+
assert edgeTags.contains(tag)
240+
}
241+
edgeTags.size() == 4
232242
}
233243

234244
StatsGroup kafkaStreamsConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == originProducerPoint.hash }
@@ -244,14 +254,20 @@ class KafkaStreamsTest extends AgentTestRunner {
244254

245255
StatsGroup kafkaStreamsProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsConsumerPoint.hash }
246256
verifyAll(kafkaStreamsProducerPoint) {
247-
edgeTags == ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"]
248-
edgeTags.size() == 3
257+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
258+
for (String tag : ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"]) {
259+
assert edgeTags.contains(tag)
260+
}
261+
edgeTags.size() == 4
249262
}
250263

251264
StatsGroup finalConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsProducerPoint.hash }
252265
verifyAll(finalConsumerPoint) {
253-
edgeTags == ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"]
254-
edgeTags.size() == 4
266+
edgeTags.any { it.startsWith("kafka_cluster_id:") }
267+
for (String tag : ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"]) {
268+
assert edgeTags.contains(tag)
269+
}
270+
edgeTags.size() == 5
255271
}
256272
}
257273

0 commit comments

Comments
 (0)