Skip to content

Commit 34bf8e5

Browse files
authored
fix(bullmq): trace context injection and orchestrion file extensions (#7669)
move bullmq trace context into opts.telemetry.metadata json string. delete propagated job context after consumption. fix bullmq orchestrion file extension
1 parent ed37311 commit 34bf8e5

File tree

4 files changed

+188
-139
lines changed

4 files changed

+188
-139
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
'use strict'
2+
3+
module.exports = [
4+
{
5+
module: {
6+
name: 'bullmq',
7+
versionRange: '>=5.66.0',
8+
filePath: 'dist/cjs/classes/queue.js',
9+
},
10+
functionQuery: {
11+
methodName: 'add',
12+
className: 'Queue',
13+
kind: 'Async',
14+
},
15+
channelName: 'Queue_add',
16+
},
17+
{
18+
module: {
19+
name: 'bullmq',
20+
versionRange: '>=5.66.0',
21+
filePath: 'dist/cjs/classes/queue.js',
22+
},
23+
functionQuery: {
24+
methodName: 'addBulk',
25+
className: 'Queue',
26+
kind: 'Async',
27+
},
28+
channelName: 'Queue_addBulk',
29+
},
30+
{
31+
module: {
32+
name: 'bullmq',
33+
versionRange: '>=5.66.0',
34+
filePath: 'dist/cjs/classes/worker.js',
35+
},
36+
functionQuery: {
37+
methodName: 'callProcessJob',
38+
className: 'Worker',
39+
kind: 'Async',
40+
},
41+
channelName: 'Worker_callProcessJob',
42+
},
43+
{
44+
module: {
45+
name: 'bullmq',
46+
versionRange: '>=5.66.0',
47+
filePath: 'dist/cjs/classes/flow-producer.js',
48+
},
49+
functionQuery: {
50+
methodName: 'add',
51+
className: 'FlowProducer',
52+
kind: 'Async',
53+
},
54+
channelName: 'FlowProducer_add',
55+
},
56+
{
57+
module: {
58+
name: 'bullmq',
59+
versionRange: '>=5.66.0',
60+
filePath: 'dist/esm/classes/queue.js',
61+
},
62+
functionQuery: {
63+
methodName: 'add',
64+
className: 'Queue',
65+
kind: 'Async',
66+
},
67+
channelName: 'Queue_add',
68+
},
69+
{
70+
module: {
71+
name: 'bullmq',
72+
versionRange: '>=5.66.0',
73+
filePath: 'dist/esm/classes/queue.js',
74+
},
75+
functionQuery: {
76+
methodName: 'addBulk',
77+
className: 'Queue',
78+
kind: 'Async',
79+
},
80+
channelName: 'Queue_addBulk',
81+
},
82+
{
83+
module: {
84+
name: 'bullmq',
85+
versionRange: '>=5.66.0',
86+
filePath: 'dist/esm/classes/worker.js',
87+
},
88+
functionQuery: {
89+
methodName: 'callProcessJob',
90+
className: 'Worker',
91+
kind: 'Async',
92+
},
93+
channelName: 'Worker_callProcessJob',
94+
},
95+
{
96+
module: {
97+
name: 'bullmq',
98+
versionRange: '>=5.66.0',
99+
filePath: 'dist/esm/classes/flow-producer.js',
100+
},
101+
functionQuery: {
102+
methodName: 'add',
103+
className: 'FlowProducer',
104+
kind: 'Async',
105+
},
106+
channelName: 'FlowProducer_add',
107+
},
108+
]

packages/datadog-instrumentations/src/helpers/rewriter/instrumentations/bullmq.json

Lines changed: 0 additions & 106 deletions
This file was deleted.

packages/datadog-plugin-bullmq/src/consumer.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ class BullmqConsumerPlugin extends ConsumerPlugin {
2222
const queueName = job?.queueName || job?.queue?.name || 'bullmq'
2323

2424
let childOf
25-
const datadogContext = job?.data?._datadog
26-
if (datadogContext) {
27-
childOf = this.tracer.extract('text_map', datadogContext)
25+
const ddCarrier = this._extractDatadog(job)
26+
if (ddCarrier) {
27+
ctx._ddCarrier = ddCarrier
28+
childOf = this.tracer.extract('text_map', ddCarrier)
2829
}
2930

3031
this.startSpan({
@@ -49,14 +50,33 @@ class BullmqConsumerPlugin extends ConsumerPlugin {
4950
const queueName = job.queueName || job.queue?.name || 'bullmq'
5051
const payloadSize = job.data ? getMessageSize(job.data) : 0
5152

52-
const datadogContext = job.data?._datadog
53-
if (datadogContext) {
54-
this.tracer.decodeDataStreamsContext(datadogContext)
53+
const ddCarrier = ctx._ddCarrier
54+
if (ddCarrier) {
55+
this.tracer.decodeDataStreamsContext(ddCarrier)
5556
}
5657

5758
const edgeTags = ['direction:in', `topic:${queueName}`, 'type:bullmq']
5859
this.tracer.setCheckpoint(edgeTags, span, payloadSize)
5960
}
61+
62+
_extractDatadog (job) {
63+
const metadataStr = job?.opts?.telemetry?.metadata
64+
if (!metadataStr) return
65+
66+
try {
67+
const metadata = JSON.parse(metadataStr)
68+
const ddCarrier = metadata._datadog
69+
if (!ddCarrier) return
70+
71+
// Clean up only our _datadog key, preserve other metadata
72+
delete metadata._datadog
73+
job.opts.telemetry.metadata = JSON.stringify(metadata)
74+
75+
return ddCarrier
76+
} catch {
77+
// Ignore malformed metadata
78+
}
79+
}
6080
}
6181

6282
module.exports = BullmqConsumerPlugin

0 commit comments

Comments
 (0)