@@ -27,6 +27,7 @@ import spock.lang.Shared
2727
2828import java.util.concurrent.LinkedBlockingQueue
2929import java.util.concurrent.TimeUnit
30+ import java.util.regex.Pattern
3031
3132@Flaky (" https://github.com/DataDog/dd-trace-java/issues/3865" )
3233class KafkaStreamsTest extends AgentTestRunner {
@@ -39,6 +40,11 @@ class KafkaStreamsTest extends AgentTestRunner {
3940 @Shared
4041 EmbeddedKafkaBroker embeddedKafka = kafkaRule. embeddedKafka
4142
43+ def setup () {
44+ // Filter out additional traces for kafka.poll operation, otherwise, there will be more traces than expected.
45+ TEST_WRITER . setFilter { trace -> trace[0 ]. operationName. toString() != ' kafka.poll' }
46+ }
47+
4248 @Override
4349 protected boolean isDataStreamsEnabled () {
4450 return true
@@ -117,8 +123,7 @@ class KafkaStreamsTest extends AgentTestRunner {
117123 received. value() == greeting. toLowerCase()
118124 received. key() == null
119125
120- // We set ignoreAdditionalTraces to true, since there are additional traces for kafka.poll operation.
121- assertTraces(3 , true ) {
126+ assertTraces(3 ) {
122127 trace(1 ) {
123128 // PRODUCER span 0
124129 span {
@@ -136,6 +141,7 @@ class KafkaStreamsTest extends AgentTestRunner {
136141 if ({ isDataStreamsEnabled()}) {
137142 " $DDTags . PATHWAY_HASH " { String }
138143 }
144+ " $InstrumentationTags . KAFKA_BOOTSTRAP_SERVERS " Pattern . compile(" 127.0.0.1:[0-9]+" )
139145 defaultTagsNoPeerService()
140146 }
141147 }
@@ -160,7 +166,6 @@ class KafkaStreamsTest extends AgentTestRunner {
160166 " $InstrumentationTags . PARTITION " { it >= 0 }
161167 " $InstrumentationTags . OFFSET " 0
162168 " $InstrumentationTags . PROCESSOR_NAME " " KSTREAM-SOURCE-0000000000"
163- " $InstrumentationTags . MESSAGING_DESTINATION_NAME " " $STREAM_PENDING "
164169 " asdf" " testing"
165170 if ({ isDataStreamsEnabled()}) {
166171 " $DDTags . PATHWAY_HASH " { String }
@@ -187,6 +192,7 @@ class KafkaStreamsTest extends AgentTestRunner {
187192 if ({ isDataStreamsEnabled()}) {
188193 " $DDTags . PATHWAY_HASH " { String }
189194 }
195+ " $InstrumentationTags . KAFKA_BOOTSTRAP_SERVERS " Pattern . compile(" 127.0.0.1:[0-9]+" )
190196 defaultTagsNoPeerService()
191197 }
192198 }
@@ -213,6 +219,7 @@ class KafkaStreamsTest extends AgentTestRunner {
213219 if ({ isDataStreamsEnabled()}) {
214220 " $DDTags . PATHWAY_HASH " { String }
215221 }
222+ " $InstrumentationTags . KAFKA_BOOTSTRAP_SERVERS " Pattern . compile(" 127.0.0.1:[0-9]+" )
216223 defaultTags(true )
217224 }
218225 }
@@ -227,8 +234,11 @@ class KafkaStreamsTest extends AgentTestRunner {
227234 if (isDataStreamsEnabled()) {
228235 StatsGroup originProducerPoint = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
229236 verifyAll(originProducerPoint) {
230- edgeTags == [" direction:out" , " topic:$STREAM_PENDING " , " type:kafka" ]
231- edgeTags. size() == 3
237+ edgeTags. any { it. startsWith(" kafka_cluster_id:" ) }
238+ for (String tag : [" direction:out" , " topic:$STREAM_PENDING " , " type:kafka" ]) {
239+ assert edgeTags. contains(tag)
240+ }
241+ edgeTags. size() == 4
232242 }
233243
234244 StatsGroup kafkaStreamsConsumerPoint = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == originProducerPoint. hash }
@@ -244,14 +254,20 @@ class KafkaStreamsTest extends AgentTestRunner {
244254
245255 StatsGroup kafkaStreamsProducerPoint = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == kafkaStreamsConsumerPoint. hash }
246256 verifyAll(kafkaStreamsProducerPoint) {
247- edgeTags == [" direction:out" , " topic:$STREAM_PROCESSED " , " type:kafka" ]
248- edgeTags. size() == 3
257+ edgeTags. any { it. startsWith(" kafka_cluster_id:" ) }
258+ for (String tag : [" direction:out" , " topic:$STREAM_PROCESSED " , " type:kafka" ]) {
259+ assert edgeTags. contains(tag)
260+ }
261+ edgeTags. size() == 4
249262 }
250263
251264 StatsGroup finalConsumerPoint = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == kafkaStreamsProducerPoint. hash }
252265 verifyAll(finalConsumerPoint) {
253- edgeTags == [" direction:in" , " group:sender" , " topic:$STREAM_PROCESSED " . toString(), " type:kafka" ]
254- edgeTags. size() == 4
266+ edgeTags. any { it. startsWith(" kafka_cluster_id:" ) }
267+ for (String tag : [" direction:in" , " group:sender" , " topic:$STREAM_PROCESSED " . toString(), " type:kafka" ]) {
268+ assert edgeTags. contains(tag)
269+ }
270+ edgeTags. size() == 5
255271 }
256272 }
257273
0 commit comments