Skip to content

Commit 51bea54

Browse files
authored
[DSM] Set checkpoints for DSM even when there is no context if the service is instrumented and fix typo (#4851)
* [DSM] Set checkpoints for DSM with SQS & Kinesis for consumers even when the producer did not have DSM enabled * [DSM] Send checkpoints to DSM if its enabled even if there is no streamName
1 parent a8896ee commit 51bea54

5 files changed

Lines changed: 101 additions & 15 deletions

File tree

packages/datadog-plugin-aws-sdk/src/services/kinesis.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,15 @@ class Kinesis extends BaseAwsSdkPlugin {
113113
response.Records.forEach(record => {
114114
const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString())
115115

116-
if (
117-
parsedAttributes?._datadog && streamName
118-
) {
119-
const payloadSize = getSizeOrZero(record.Data)
116+
const payloadSize = getSizeOrZero(record.Data)
117+
if (parsedAttributes?._datadog) {
120118
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
121-
this.tracer
122-
.setCheckpoint(['direction:in', `topic:${streamName}`, 'type:kinesis'], span, payloadSize)
123119
}
120+
const tags = streamName
121+
? ['direction:in', `topic:${streamName}`, 'type:kinesis']
122+
: ['direction:in', 'type:kinesis']
123+
this.tracer
124+
.setCheckpoint(tags, span, payloadSize)
124125
})
125126
}
126127

packages/datadog-plugin-aws-sdk/src/services/sqs.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class Sqs extends BaseAwsSdkPlugin {
4242
// extract DSM context after as we might not have a parent-child but may have a DSM context
4343

4444
this.responseExtractDSMContext(
45-
request.operation, request.params, response, span || null, { parsedMessageAttributes }
45+
request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes }
4646
)
4747
})
4848

@@ -195,16 +195,16 @@ class Sqs extends BaseAwsSdkPlugin {
195195
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
196196
}
197197
}
198+
const payloadSize = getHeadersSize({
199+
Body: message.Body,
200+
MessageAttributes: message.MessageAttributes
201+
})
202+
const queue = params.QueueUrl.split('/').pop()
198203
if (parsedAttributes) {
199-
const payloadSize = getHeadersSize({
200-
Body: message.Body,
201-
MessageAttributes: message.MessageAttributes
202-
})
203-
const queue = params.QueueUrl.split('/').pop()
204204
this.tracer.decodeDataStreamsContext(parsedAttributes)
205-
this.tracer
206-
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
207205
}
206+
this.tracer
207+
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
208208
})
209209
}
210210

packages/datadog-plugin-aws-sdk/test/kinesis.spec.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,32 @@ describe('Kinesis', function () {
303303
})
304304
})
305305

306+
it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => {
307+
agent.expectPipelineStats(dsmStats => {
308+
let statsPointsReceived = 0
309+
// we should have only have 1 stats point since we only had 1 put operation
310+
dsmStats.forEach((timeStatsBucket) => {
311+
if (timeStatsBucket && timeStatsBucket.Stats) {
312+
timeStatsBucket.Stats.forEach((statsBuckets) => {
313+
statsPointsReceived += statsBuckets.Stats.length
314+
})
315+
}
316+
}, { timeoutMs: 10000 })
317+
expect(statsPointsReceived).to.equal(1)
318+
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
319+
}, { timeoutMs: 10000 }).then(done, done)
320+
321+
agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false })
322+
helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => {
323+
if (err) return done(err)
324+
325+
agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true })
326+
helpers.getTestData(kinesis, streamNameDSM, data, (err) => {
327+
if (err) return done(err)
328+
})
329+
})
330+
})
331+
306332
it('emits DSM stats to the agent during Kinesis putRecords', done => {
307333
// we need to stub Date.now() to ensure a new stats bucket is created for each call
308334
// otherwise, all stats checkpoints will be combined into a single stats points

packages/datadog-plugin-aws-sdk/test/sqs.spec.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const { rawExpectedSchema } = require('./sqs-naming')
88

99
const queueName = 'SQS_QUEUE_NAME'
1010
const queueNameDSM = 'SQS_QUEUE_NAME_DSM'
11+
const queueNameDSMConsumerOnly = 'SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'
1112

1213
const getQueueParams = (queueName) => {
1314
return {
@@ -20,6 +21,7 @@ const getQueueParams = (queueName) => {
2021

2122
const queueOptions = getQueueParams(queueName)
2223
const queueOptionsDsm = getQueueParams(queueNameDSM)
24+
const queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly)
2325

2426
describe('Plugin', () => {
2527
describe('aws-sdk (sqs)', function () {
@@ -30,6 +32,7 @@ describe('Plugin', () => {
3032
let sqs
3133
const QueueUrl = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME'
3234
const QueueUrlDsm = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM'
35+
const QueueUrlDsmConsumerOnly = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'
3336
let tracer
3437

3538
const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk'
@@ -412,10 +415,25 @@ describe('Plugin', () => {
412415
})
413416
})
414417

418+
before(done => {
419+
AWS = require(`../../../versions/${sqsClientName}@${version}`).get()
420+
421+
sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' })
422+
sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => {
423+
if (err) return done(err)
424+
425+
done()
426+
})
427+
})
428+
415429
after(done => {
416430
sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done)
417431
})
418432

433+
after(done => {
434+
sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done)
435+
})
436+
419437
after(() => {
420438
return agent.close({ ritmReset: false })
421439
})
@@ -546,6 +564,28 @@ describe('Plugin', () => {
546564
})
547565
})
548566

567+
it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => {
568+
agent.expectPipelineStats(dsmStats => {
569+
let statsPointsReceived = 0
570+
// we should have 2 dsm stats points
571+
dsmStats.forEach((timeStatsBucket) => {
572+
if (timeStatsBucket && timeStatsBucket.Stats) {
573+
timeStatsBucket.Stats.forEach((statsBuckets) => {
574+
statsPointsReceived += statsBuckets.Stats.length
575+
})
576+
}
577+
})
578+
expect(statsPointsReceived).to.equal(1)
579+
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
580+
}).then(done, done)
581+
582+
agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false })
583+
sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => {
584+
agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true })
585+
sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {})
586+
})
587+
})
588+
549589
it('Should emit DSM stats to the agent when sending batch messages', done => {
550590
// we need to stub Date.now() to ensure a new stats bucket is created for each call
551591
// otherwise, all stats checkpoints will be combined into a single stats points

packages/dd-trace/test/plugins/agent.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,24 @@ function dsmStatsExist (agent, expectedHash, expectedEdgeTags) {
6969
return hashFound
7070
}
7171

72+
function dsmStatsExistWithParentHash (agent, expectedParentHash) {
73+
const dsmStats = agent.getDsmStats()
74+
let hashFound = false
75+
if (dsmStats.length !== 0) {
76+
for (const statsTimeBucket of dsmStats) {
77+
for (const statsBucket of statsTimeBucket.Stats) {
78+
for (const stats of statsBucket.Stats) {
79+
if (stats.ParentHash.toString() === expectedParentHash) {
80+
hashFound = true
81+
return hashFound
82+
}
83+
}
84+
}
85+
}
86+
}
87+
return hashFound
88+
}
89+
7290
function addEnvironmentVariablesToHeaders (headers) {
7391
// get all environment variables that start with "DD_"
7492
const ddEnvVars = new Map(
@@ -424,5 +442,6 @@ module.exports = {
424442
tracer,
425443
testedPlugins,
426444
getDsmStats,
427-
dsmStatsExist
445+
dsmStatsExist,
446+
dsmStatsExistWithParentHash
428447
}

0 commit comments

Comments
 (0)