Skip to content
This repository was archived by the owner on Jan 21, 2026. It is now read-only.

Commit d1de959

Browse files
authored
feat: support child spans with tail latencies (#913)
1 parent f88cdd4 commit d1de959

8 files changed

Lines changed: 261 additions & 95 deletions

File tree

src/config.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,6 @@ export interface Config {
115115
*/
116116
stackTraceLimit?: number;
117117

118-
/**
119-
* Buffer the captured traces for `flushDelaySeconds` seconds before
120-
* publishing to the trace API, unless the buffer fills up first.
121-
* Also see `bufferSize`.
122-
*/
123-
flushDelaySeconds?: number;
124-
125118
/**
126119
* URLs that partially match any regex in ignoreUrls will not be traced.
127120
* In addition, URLs that are _exact matches_ of strings in ignoreUrls will
@@ -171,8 +164,16 @@ export interface Config {
171164
contextHeaderBehavior?: ContextHeaderBehavior;
172165

173166
/**
174-
* The number of transactions we buffer before we publish to the trace
175-
* API, unless `flushDelaySeconds` seconds have elapsed first.
167+
* Buffer the captured traces for `flushDelaySeconds` seconds before
168+
* publishing to the Stackdriver Trace API, unless the buffer fills up first.
169+
* Also see `bufferSize`.
170+
*/
171+
flushDelaySeconds?: number;
172+
173+
/**
174+
* The number of spans in buffered traces needed to trigger a publish of all
175+
* traces to the Stackdriver Trace API, unless `flushDelaySeconds` seconds
176+
* has elapsed first.
176177
*/
177178
bufferSize?: number;
178179

src/span-data.ts

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ import * as crypto from 'crypto';
1818
import * as util from 'util';
1919

2020
import {Constants, SpanType} from './constants';
21-
import * as types from './plugin-types';
22-
import {Span, SpanOptions} from './plugin-types';
21+
import {RootSpan, Span, SpanOptions} from './plugin-types';
2322
import {SpanKind, Trace, TraceSpan} from './trace';
2423
import {TraceLabels} from './trace-labels';
2524
import {traceWriter} from './trace-writer';
@@ -104,6 +103,9 @@ export abstract class BaseSpanData implements Span {
104103
}
105104

106105
endSpan(timestamp?: Date) {
106+
if (!!this.span.endTime) {
107+
return;
108+
}
107109
timestamp = timestamp || new Date();
108110
this.span.endTime = timestamp.toISOString();
109111
}
@@ -112,8 +114,11 @@ export abstract class BaseSpanData implements Span {
112114
/**
113115
* Represents a real root span, which corresponds to an incoming request.
114116
*/
115-
export class RootSpanData extends BaseSpanData implements types.RootSpan {
117+
export class RootSpanData extends BaseSpanData implements RootSpan {
116118
readonly type = SpanType.ROOT;
119+
// Locally-tracked list of children. Used only to determine, once this span
120+
// ends, whether a child still needs to be published.
121+
private children: ChildSpanData[] = [];
117122

118123
constructor(
119124
trace: Trace, spanName: string, parentSpanId: string,
@@ -125,16 +130,30 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan {
125130
createChildSpan(options?: SpanOptions): Span {
126131
options = options || {name: ''};
127132
const skipFrames = options.skipFrames ? options.skipFrames + 1 : 1;
128-
return new ChildSpanData(
133+
const child = new ChildSpanData(
129134
this.trace, /* Trace object */
130135
options.name, /* Span name */
131136
this.span.spanId, /* Parent's span ID */
132137
skipFrames); /* # of frames to skip in stack trace */
138+
this.children.push(child);
139+
return child;
133140
}
134141

135142
endSpan(timestamp?: Date) {
143+
if (!!this.span.endTime) {
144+
return;
145+
}
136146
super.endSpan(timestamp);
137147
traceWriter.get().writeTrace(this.trace);
148+
this.children.forEach(child => {
149+
if (!child.span.endTime) {
150+
// Child hasn't ended yet.
151+
// Inform the child that it needs to self-publish.
152+
child.shouldSelfPublish = true;
153+
}
154+
});
155+
// We no longer need to keep track of our children.
156+
this.children = [];
138157
}
139158
}
140159

@@ -143,13 +162,31 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan {
143162
*/
144163
export class ChildSpanData extends BaseSpanData {
145164
readonly type = SpanType.CHILD;
165+
// Whether this span should publish itself. This is meant to be set to true
166+
// by the parent RootSpanData.
167+
shouldSelfPublish = false;
146168

147169
constructor(
148170
trace: Trace, spanName: string, parentSpanId: string,
149171
skipFrames: number) {
150172
super(trace, spanName, parentSpanId, skipFrames);
151173
this.span.kind = SpanKind.RPC_CLIENT;
152174
}
175+
176+
endSpan(timestamp?: Date) {
177+
if (!!this.span.endTime) {
178+
return;
179+
}
180+
super.endSpan(timestamp);
181+
if (this.shouldSelfPublish) {
182+
// Also, publish just this span.
183+
traceWriter.get().writeTrace({
184+
projectId: this.trace.projectId,
185+
traceId: this.trace.traceId,
186+
spans: [this.span]
187+
});
188+
}
189+
}
153190
}
154191

155192
// Helper function to generate static virtual trace spans.

src/trace-writer.ts

Lines changed: 81 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616

1717
import * as common from '@google-cloud/common';
18-
import {AxiosError} from 'axios';
1918
import * as gcpMetadata from 'gcp-metadata';
2019
import {OutgoingHttpHeaders} from 'http';
2120
import * as os from 'os';
@@ -55,12 +54,50 @@ export interface LabelObject {
5554
[key: string]: string;
5655
}
5756

57+
export class TraceBuffer {
58+
/**
59+
* Buffered traces.
60+
*/
61+
private traces: Trace[] = [];
62+
/**
63+
* Number of buffered spans; this number must be at least as large as
64+
* buffer.length.
65+
*/
66+
private numSpans = 0;
67+
68+
/**
69+
* Add a new trace to the buffer.
70+
* @param trace The trace to add.
71+
*/
72+
add(trace: Trace) {
73+
this.traces.push(trace);
74+
this.numSpans += trace.spans.length;
75+
}
76+
77+
/**
78+
* Gets the number of spans contained within buffered traces.
79+
*/
80+
getNumSpans() {
81+
return this.numSpans;
82+
}
83+
84+
/**
85+
* Clears the buffer, returning its original contents.
86+
*/
87+
drain(): Trace[] {
88+
const result = this.traces;
89+
this.traces = [];
90+
this.numSpans = 0;
91+
return result;
92+
}
93+
}
94+
5895
/**
5996
* A class representing a service that publishes traces in the background.
6097
*/
6198
export class TraceWriter extends common.Service {
62-
/** Stringified traces to be published */
63-
buffer: string[];
99+
/** Traces to be published */
100+
protected buffer: TraceBuffer;
64101
/** Default labels to be attached to written spans */
65102
defaultLabels: LabelObject;
66103
/** Reference to global unhandled exception handler */
@@ -89,7 +126,7 @@ export class TraceWriter extends common.Service {
89126
config);
90127

91128
this.logger = logger;
92-
this.buffer = [];
129+
this.buffer = new TraceBuffer();
93130
this.defaultLabels = {};
94131

95132
this.isActive = true;
@@ -216,54 +253,31 @@ export class TraceWriter extends common.Service {
216253
}
217254

218255
/**
219-
* Ensures that all sub spans of the provided Trace object are
220-
* closed and then queues the span data to be published.
256+
* Queues a trace to be published. Spans with no end time are excluded.
221257
*
222258
* @param trace The trace to be queued.
223259
*/
224260
writeTrace(trace: Trace) {
225-
for (const span of trace.spans) {
226-
if (span.endTime === '') {
227-
span.endTime = (new Date()).toISOString();
228-
}
229-
}
261+
const publishableSpans = trace.spans.filter(span => !!span.endTime);
230262

231-
trace.spans.forEach(spanData => {
263+
publishableSpans.forEach(spanData => {
232264
if (spanData.kind === SpanKind.RPC_SERVER) {
233265
// Copy properties from the default labels.
234266
Object.assign(spanData.labels, this.defaultLabels);
235267
}
236268
});
237269

238-
const afterProjectId = (projectId: string) => {
239-
trace.projectId = projectId;
240-
this.buffer.push(JSON.stringify(trace));
241-
this.logger.info(
242-
`TraceWriter#writeTrace: buffer.size = ${this.buffer.length}`);
243-
244-
// Publish soon if the buffer is getting big
245-
if (this.buffer.length >= this.config.bufferSize) {
246-
this.logger.info(
247-
'TraceWriter#writeTrace: Trace buffer full, flushing.');
248-
setImmediate(() => this.flushBuffer());
249-
}
250-
};
251-
252-
// TODO(kjin): We should always be following the 'else' path.
253-
// Any test that doesn't mock the Trace Writer will assume that traces get
254-
// buffered synchronously. We need to refactor those tests to remove that
255-
// assumption before we can make this fix.
256-
if (this.projectId !== NO_PROJECT_ID_TOKEN) {
257-
afterProjectId(this.projectId);
258-
} else {
259-
this.getProjectId().then(afterProjectId, (err: Error) => {
260-
// Because failing to get a project ID means that the trace agent will
261-
// get disabled, there is a very small window for this code path to be
262-
// taken. For this reason we don't do anything more complex than just
263-
// notifying that we are dropping the current trace.
264-
this.logger.info(
265-
'TraceWriter#queueTrace: No project ID, dropping trace.');
266-
});
270+
this.buffer.add({
271+
traceId: trace.traceId,
272+
projectId: trace.projectId,
273+
spans: publishableSpans
274+
});
275+
this.logger.info(`TraceWriter#writeTrace: number of buffered spans = ${
276+
this.buffer.getNumSpans()}`);
277+
// Publish soon if the buffer is getting big
278+
if (this.buffer.getNumSpans() >= this.config.bufferSize) {
279+
this.logger.info('TraceWriter#writeTrace: Trace buffer full, flushing.');
280+
setImmediate(() => this.flushBuffer());
267281
}
268282
}
269283

@@ -292,15 +306,35 @@ export class TraceWriter extends common.Service {
292306
* Serializes the buffered traces to be published asynchronously.
293307
*/
294308
private flushBuffer() {
295-
if (this.buffer.length === 0) {
309+
// Privatize and clear the buffer.
310+
const flushedTraces = this.buffer.drain();
311+
if (flushedTraces.length === 0) {
296312
return;
297313
}
298314

299-
// Privatize and clear the buffer.
300-
const buffer = this.buffer;
301-
this.buffer = [];
302-
this.logger.debug('TraceWriter#flushBuffer: Flushing traces', buffer);
303-
this.publish(`{"traces":[${buffer.join()}]}`);
315+
const afterProjectId = (projectId: string) => {
316+
flushedTraces.forEach(trace => trace.projectId = projectId);
317+
this.logger.debug(
318+
'TraceWriter#flushBuffer: Flushing traces', flushedTraces);
319+
this.publish(JSON.stringify({traces: flushedTraces}));
320+
};
321+
322+
// TODO(kjin): We should always be following the 'else' path.
323+
// Any test that doesn't mock the Trace Writer will assume that traces get
324+
// buffered synchronously. We need to refactor those tests to remove that
325+
// assumption before we can make this fix.
326+
if (this.projectId !== NO_PROJECT_ID_TOKEN) {
327+
afterProjectId(this.projectId);
328+
} else {
329+
this.getProjectId().then(afterProjectId, (err: Error) => {
330+
// Because failing to get a project ID means that the trace agent will
331+
// get disabled, there is a very small window for this code path to be
332+
// taken. For this reason we don't do anything more complex than just
333+
// notifying that we are dropping the current traces.
334+
this.logger.info(
335+
'TraceWriter#flushBuffer: No project ID, dropping traces.');
336+
});
337+
}
304338
}
305339

306340
/**

test/plugins/common.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ function replaceWarnLogger(fn) {
9494
* Cleans the tracer state between test runs.
9595
*/
9696
function cleanTraces() {
97-
traceWriter.get().buffer = [];
97+
traceWriter.get()['buffer'].drain();
9898
}
9999

100100
function getTraces() {
101-
return traceWriter.get().buffer.map(buffer => JSON.parse(buffer));
101+
return traceWriter.get()['buffer']['traces'];
102102
}
103103

104104
function getMatchingSpan(predicate) {
@@ -138,6 +138,7 @@ function createChildSpan(cb, duration) {
138138
assert.ok(span);
139139
var t = setTimeout(function() {
140140
assert.strictEqual(span.type, SpanType.CHILD);
141+
span.endSpan();
141142
if (cb) {
142143
cb();
143144
}

test/plugins/test-trace-http2.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => {
206206
assert.ok(rootSpan.type === SpanType.ROOT);
207207
const session = http2.connect(`http://localhost:${serverPort}`);
208208
const s = session.request({':path': '/?foo=bar'});
209+
s.on('data', () => {}); // enter flowing mode
209210
s.end();
210211
setTimeout(() => {
211212
rootSpan.endSpan();
@@ -225,6 +226,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => {
225226
assert.ok(rootSpan.type === SpanType.ROOT);
226227
const session = http2.connect(`http://localhost:${serverPort}`);
227228
const s = session.request({':path': '/'});
229+
s.on('data', () => {}); // enter flowing mode
228230
s.end();
229231
setTimeout(() => {
230232
rootSpan.endSpan();

0 commit comments

Comments
 (0)