Skip to content

Commit c195ee8

Browse files
authored
fix: reduce memory allocation and use transparent instrumentation (#7422)
The instrumentation was noticable by a user so far and hooked into the public event listener addition / removal. The event handlers being added could not be detected as not being the ones by the user. That is improved here by using an internal only instrumentation that should also reduce overhead and needs less coordination between the additions and removals. The plugin is refactored to have less memory retantion. So far, the whole span was attached and in case long living ws connections existed with a high load, the spans could add up without ever being freed. Now that tracking overhead is much lower and the span is created when needed.
1 parent d5e899b commit c195ee8

File tree

7 files changed

+407
-110
lines changed

7 files changed

+407
-110
lines changed

packages/datadog-instrumentations/src/ws.js

Lines changed: 125 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const tracingChannel = require('dc-polyfill').tracingChannel
3+
const { tracingChannel } = /** @type {import('node:diagnostics_channel')} */ (require('dc-polyfill'))
44

55
const shimmer = require('../../datadog-shimmer')
66
const {
@@ -15,7 +15,50 @@ const closeCh = tracingChannel('ws:close')
1515
const emitCh = channel('tracing:ws:server:connect:emit')
1616
// TODO: Add a error channel / handle error events properly.
1717

18-
const eventHandlerMap = new WeakMap()
18+
/**
19+
* @typedef {object} WebSocketServerPrototype
20+
* @property {(...args: unknown[]) => unknown} handleUpgrade
21+
* @property {(...args: unknown[]) => unknown} emit
22+
*/
23+
24+
/**
25+
* @typedef {{ prototype: WebSocketServerPrototype }} WebSocketServerClass
26+
*/
27+
28+
/**
29+
* @typedef {object} WebSocketPrototype
30+
* @property {(...args: unknown[]) => unknown} send
31+
* @property {(...args: unknown[]) => unknown} close
32+
* @property {(...args: unknown[]) => unknown} setSocket
33+
*/
34+
35+
/**
36+
* @typedef {{ prototype: WebSocketPrototype }} WebSocketClass
37+
*/
38+
39+
/**
40+
* @typedef {object} ReceiverPrototype
41+
* @property {(eventName: string, listener: (...args: unknown[]) => unknown) => unknown} on
42+
* @property {(eventName: string, listener: (...args: unknown[]) => unknown) => unknown} addListener
43+
*/
44+
45+
/**
46+
* @typedef {{ prototype: ReceiverPrototype }} ReceiverClass
47+
*/
48+
49+
/**
50+
* @typedef {string | Buffer | ArrayBuffer | ArrayBufferView | Blob | Buffer[]} WebSocketMessageData
51+
*/
52+
53+
/**
54+
* @typedef {object} WebSocketInstance
55+
* @property {(...args: unknown[]) => unknown} emit
56+
* @property {(eventName: string) => number} [listenerCount]
57+
* @property {{ _socket?: unknown } | undefined} [_sender]
58+
* @property {unknown} [_receiver]
59+
*/
60+
61+
let kWebSocketSymbol
1962

2063
function wrapHandleUpgrade (handleUpgrade) {
2164
return function () {
@@ -41,7 +84,9 @@ function wrapHandleUpgrade (handleUpgrade) {
4184

4285
function wrapSend (send) {
4386
return function wrappedSend (...args) {
44-
if (!producerCh.start.hasSubscribers) return send.apply(this, arguments)
87+
if (!producerCh.start.hasSubscribers) {
88+
return send.apply(this, arguments)
89+
}
4590

4691
const [data, options, cb] = arguments
4792

@@ -55,7 +100,9 @@ function wrapSend (send) {
55100

56101
function createWrapEmit (emit) {
57102
return function (title, headers, req) {
58-
if (!serverCh.start.hasSubscribers || title !== 'headers') return emit.apply(this, arguments)
103+
if (!serverCh.start.hasSubscribers || title !== 'headers') {
104+
return emit.apply(this, arguments)
105+
}
59106

60107
const ctx = { req }
61108
ctx.req.resStatus = headers[0].split(' ')[1]
@@ -70,35 +117,42 @@ function createWrapEmit (emit) {
70117
}
71118
}
72119

73-
function createWrappedHandler (handler) {
74-
return shimmer.wrapFunction(handler, originalHandler => function (data, binary) {
75-
const byteLength = dataLength(data)
120+
/**
121+
* @param {Function} setSocket
122+
* @returns {(...args: unknown[]) => unknown}
123+
*/
124+
/**
125+
* @param {Function} on
126+
* @returns {(...args: unknown[]) => unknown}
127+
*/
128+
function wrapReceiverOn (on) {
129+
return function wrappedOn (eventName, handler) {
130+
if (eventName !== 'message' || typeof handler !== 'function') {
131+
return on.apply(this, arguments)
132+
}
76133

77-
const ctx = { data, binary, socket: this._sender?._socket, byteLength }
134+
const wrappedHandler = function (data, isBinary) {
135+
if (!receiverCh.start.hasSubscribers || !kWebSocketSymbol) {
136+
return handler.call(this, data, isBinary)
137+
}
78138

79-
return receiverCh.traceSync(originalHandler, ctx, this, data, binary)
80-
})
81-
}
139+
const websocket = /** @type {WebSocketInstance | undefined} */ (this[kWebSocketSymbol])
140+
// Avoid receive spans when no one listens to messages.
141+
if (websocket && typeof websocket.listenerCount === 'function' && websocket.listenerCount('message') === 0) {
142+
return handler.call(this, data, isBinary)
143+
}
144+
const socket = websocket?._sender?._socket
145+
if (!socket) {
146+
return handler.call(this, data, isBinary)
147+
}
82148

83-
function wrapListener (originalOn) {
84-
return function (eventName, handler) {
85-
if (eventName === 'message') {
86-
// Prevent multiple wrapping of the same handler in case the user adds the listener multiple times
87-
const wrappedHandler = eventHandlerMap.get(handler) ?? createWrappedHandler(handler)
88-
eventHandlerMap.set(handler, wrappedHandler)
89-
return originalOn.call(this, eventName, wrappedHandler)
90-
}
91-
return originalOn.apply(this, arguments)
92-
}
93-
}
149+
const byteLength = dataLength(/** @type {WebSocketMessageData} */ (data))
150+
const ctx = { data, binary: isBinary, socket, byteLength }
94151

95-
function removeListener (originalOff) {
96-
return function (eventName, handler) {
97-
if (eventName === 'message') {
98-
const wrappedHandler = eventHandlerMap.get(handler) || handler
99-
return originalOff.call(this, eventName, wrappedHandler)
152+
return receiverCh.traceSync(handler, ctx, this, data, isBinary)
100153
}
101-
return originalOff.apply(this, arguments)
154+
155+
return on.call(this, eventName, wrappedHandler)
102156
}
103157
}
104158

@@ -120,7 +174,8 @@ addHook({
120174
name: 'ws',
121175
file: 'lib/websocket-server.js',
122176
versions: ['>=8.0.0'],
123-
}, ws => {
177+
}, moduleExports => {
178+
const ws = /** @type {WebSocketServerClass} */ (moduleExports)
124179
shimmer.wrap(ws.prototype, 'handleUpgrade', wrapHandleUpgrade)
125180
shimmer.wrap(ws.prototype, 'emit', createWrapEmit)
126181
return ws
@@ -130,26 +185,58 @@ addHook({
130185
name: 'ws',
131186
file: 'lib/websocket.js',
132187
versions: ['>=8.0.0'],
133-
}, ws => {
188+
}, moduleExports => {
189+
const ws = /** @type {WebSocketClass} */ (moduleExports)
134190
shimmer.wrap(ws.prototype, 'send', wrapSend)
135191
shimmer.wrap(ws.prototype, 'close', wrapClose)
136192

137-
// TODO: Do not wrap these methods. Instead, add a listener to the websocket instance when one is created.
138-
// That way it avoids producing too many spans for the same websocket instance and less user code is impacted.
139-
shimmer.wrap(ws.prototype, 'on', wrapListener)
140-
shimmer.wrap(ws.prototype, 'addListener', wrapListener)
141-
shimmer.wrap(ws.prototype, 'off', removeListener)
142-
shimmer.wrap(ws.prototype, 'removeListener', removeListener)
143-
144193
return ws
145194
})
146195

196+
addHook({
197+
name: 'ws',
198+
file: 'lib/constants.js',
199+
versions: ['>=8.0.0'],
200+
}, moduleExports => {
201+
const constants = /** @type {{ kWebSocket?: symbol }} */ (moduleExports)
202+
kWebSocketSymbol = constants.kWebSocket
203+
return constants
204+
})
205+
206+
addHook({
207+
name: 'ws',
208+
file: 'lib/receiver.js',
209+
versions: ['>=8.0.0'],
210+
}, moduleExports => {
211+
const Receiver = /** @type {ReceiverClass} */ (moduleExports)
212+
shimmer.wrap(Receiver.prototype, 'on', wrapReceiverOn)
213+
shimmer.wrap(Receiver.prototype, 'addListener', wrapReceiverOn)
214+
return Receiver
215+
})
216+
217+
/**
218+
* @param {WebSocketMessageData} data
219+
* @returns {number}
220+
*/
147221
function dataLength (data) {
148222
if (typeof data === 'string') {
149223
return Buffer.byteLength(data)
150224
}
151225
if (data instanceof Blob) {
152226
return data.size
153227
}
154-
return data?.length ?? 0
228+
if (ArrayBuffer.isView(data)) {
229+
return data.byteLength
230+
}
231+
if (data instanceof ArrayBuffer) {
232+
return data.byteLength
233+
}
234+
let total = 0
235+
if (Array.isArray(data)) {
236+
const chunks = /** @type {Buffer[]} */ (data)
237+
for (const chunk of chunks) {
238+
total += chunk.length
239+
}
240+
}
241+
return total
155242
}

packages/datadog-plugin-ws/src/close.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class WSClosePlugin extends TracingPlugin {
2828
if (!socket?.spanContext) return
2929

3030
const spanKind = isPeerClose ? 'consumer' : 'producer'
31-
const spanTags = socket.spanContext.spanTags
31+
const spanTags = socket.spanTags
3232
const path = spanTags['resource.name'].split(' ')[1]
3333
const service = this.serviceName({ pluginConfig: this.config })
3434
const span = this.startSpan(this.operationName(), {
@@ -76,14 +76,13 @@ class WSClosePlugin extends TracingPlugin {
7676
linkAttributes['dd.kind'] = isIncoming ? 'executed_by' : 'resuming'
7777

7878
// Add span pointer for context propagation
79-
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
80-
const handshakeSpan = ctx.socket.handshakeSpan
79+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.spanContext) {
80+
const handshakeContext = ctx.socket.spanContext
8181

8282
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
83-
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
83+
if (hasDistributedTracingContext(handshakeContext, ctx.socket)) {
8484
const counterType = isIncoming ? 'receiveCounter' : 'sendCounter'
8585
const counter = incrementWebSocketCounter(ctx.socket, counterType)
86-
const handshakeContext = handshakeSpan.context()
8786

8887
const ptrHash = buildWebSocketSpanPointerHash(
8988
handshakeContext._traceId,

packages/datadog-plugin-ws/src/producer.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class WSProducerPlugin extends TracingPlugin {
2222
const { byteLength, socket, binary } = ctx
2323
if (!socket.spanContext) return
2424

25-
const spanTags = socket.spanContext.spanTags
25+
const spanTags = socket.spanTags
2626
const path = spanTags['resource.name'].split(' ')[1]
2727
const opCode = binary ? 'binary' : 'text'
2828
const service = this.serviceName({ pluginConfig: this.config })
@@ -61,13 +61,12 @@ class WSProducerPlugin extends TracingPlugin {
6161
const linkAttributes = { 'dd.kind': 'resuming' }
6262

6363
// Add span pointer for context propagation
64-
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
65-
const handshakeSpan = ctx.socket.handshakeSpan
64+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.spanContext) {
65+
const handshakeContext = ctx.socket.spanContext
6666

6767
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
68-
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
68+
if (hasDistributedTracingContext(handshakeContext, ctx.socket)) {
6969
const counter = incrementWebSocketCounter(ctx.socket, 'sendCounter')
70-
const handshakeContext = handshakeSpan.context()
7170

7271
const ptrHash = buildWebSocketSpanPointerHash(
7372
handshakeContext._traceId,

packages/datadog-plugin-ws/src/receiver.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class WSReceiverPlugin extends TracingPlugin {
2727
const { byteLength, socket, binary } = ctx
2828
if (!socket.spanContext) return
2929

30-
const spanTags = socket.spanContext.spanTags
30+
const spanTags = socket.spanTags
3131
const path = spanTags['resource.name'].split(' ')[1]
3232
const opCode = binary ? 'binary' : 'text'
3333

@@ -72,13 +72,12 @@ class WSReceiverPlugin extends TracingPlugin {
7272
const linkAttributes = { 'dd.kind': 'executed_by' }
7373

7474
// Add span pointer for context propagation
75-
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
76-
const handshakeSpan = ctx.socket.handshakeSpan
75+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.spanContext) {
76+
const handshakeContext = ctx.socket.spanContext
7777

7878
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
79-
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
79+
if (hasDistributedTracingContext(handshakeContext, ctx.socket)) {
8080
const counter = incrementWebSocketCounter(ctx.socket, 'receiveCounter')
81-
const handshakeContext = handshakeSpan.context()
8281

8382
const ptrHash = buildWebSocketSpanPointerHash(
8483
handshakeContext._traceId,

packages/datadog-plugin-ws/src/server.js

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
44
const tags = require('../../../ext/tags.js')
5-
const { FORMAT_HTTP_HEADERS } = require('../../../ext/formats')
6-
const { initWebSocketMessageCounters } = require('./util')
5+
const { HTTP_HEADERS } = require('../../../ext/formats')
6+
const {
7+
createWebSocketSpanContext,
8+
hasTraceHeaders,
9+
initWebSocketMessageCounters,
10+
} = require('./util')
711

812
const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE
913

@@ -27,11 +31,12 @@ class WSServerPlugin extends TracingPlugin {
2731
const indexOfParam = url.indexOf('?')
2832
const route = indexOfParam === -1 ? url : url.slice(0, indexOfParam)
2933
const uri = `${protocol}//${host}${route}`
34+
const resourceName = `${options.method} ${route}`
3035

3136
ctx.args = { options }
3237

3338
// Extract distributed tracing context from request headers
34-
const childOf = this.tracer.extract(FORMAT_HTTP_HEADERS, req.headers)
39+
const childOf = this.tracer.extract(HTTP_HEADERS, req.headers)
3540

3641
const service = this.serviceName({ pluginConfig: this.config })
3742
const span = this.startSpan(this.operationName(), {
@@ -42,19 +47,19 @@ class WSServerPlugin extends TracingPlugin {
4247
'http.upgraded': 'websocket',
4348
'http.method': options.method,
4449
'http.url': uri,
45-
'resource.name': `${options.method} ${route}`,
50+
'resource.name': resourceName,
4651
'span.kind': 'server',
4752
},
4853

4954
}, ctx)
5055
ctx.span = span
5156

52-
ctx.socket.spanContext = ctx.span._spanContext
53-
ctx.socket.spanContext.spanTags = ctx.span._spanContext._tags
54-
// Store the handshake span for use in message span pointers
55-
ctx.socket.handshakeSpan = ctx.span
56-
// Store the request headers for distributed tracing check
57-
ctx.socket.requestHeaders = req.headers
57+
ctx.socket.spanTags = {
58+
'resource.name': resourceName,
59+
'service.name': service,
60+
}
61+
ctx.socket.spanContext = createWebSocketSpanContext(ctx.span._spanContext)
62+
ctx.socket.hasTraceHeaders = hasTraceHeaders(req.headers)
5863

5964
// Initialize message counters for span pointers
6065
initWebSocketMessageCounters(ctx.socket)

0 commit comments

Comments
 (0)