|
15 | 15 | */ |
16 | 16 |
|
17 | 17 | import * as common from '@google-cloud/common'; |
18 | | -import {AxiosError} from 'axios'; |
19 | 18 | import * as gcpMetadata from 'gcp-metadata'; |
20 | 19 | import {OutgoingHttpHeaders} from 'http'; |
21 | 20 | import * as os from 'os'; |
@@ -55,12 +54,50 @@ export interface LabelObject { |
55 | 54 | [key: string]: string; |
56 | 55 | } |
57 | 56 |
|
| 57 | +export class TraceBuffer { |
| 58 | + /** |
| 59 | + * Buffered traces. |
| 60 | + */ |
| 61 | + private traces: Trace[] = []; |
| 62 | + /** |
| 63 | + * Number of buffered spans; this number must be at least as large as |
| 64 | + * buffer.length. |
| 65 | + */ |
| 66 | + private numSpans = 0; |
| 67 | + |
| 68 | + /** |
| 69 | + * Add a new trace to the buffer. |
| 70 | + * @param trace The trace to add. |
| 71 | + */ |
| 72 | + add(trace: Trace) { |
| 73 | + this.traces.push(trace); |
| 74 | + this.numSpans += trace.spans.length; |
| 75 | + } |
| 76 | + |
| 77 | + /** |
| 78 | + * Gets the number of spans contained within buffered traces. |
| 79 | + */ |
| 80 | + getNumSpans() { |
| 81 | + return this.numSpans; |
| 82 | + } |
| 83 | + |
| 84 | + /** |
| 85 | + * Clears the buffer, returning its original contents. |
| 86 | + */ |
| 87 | + drain(): Trace[] { |
| 88 | + const result = this.traces; |
| 89 | + this.traces = []; |
| 90 | + this.numSpans = 0; |
| 91 | + return result; |
| 92 | + } |
| 93 | +} |
| 94 | + |
58 | 95 | /** |
59 | 96 | * A class representing a service that publishes traces in the background. |
60 | 97 | */ |
61 | 98 | export class TraceWriter extends common.Service { |
62 | | - /** Stringified traces to be published */ |
63 | | - buffer: string[]; |
| 99 | + /** Traces to be published */ |
| 100 | + protected buffer: TraceBuffer; |
64 | 101 | /** Default labels to be attached to written spans */ |
65 | 102 | defaultLabels: LabelObject; |
66 | 103 | /** Reference to global unhandled exception handler */ |
@@ -89,7 +126,7 @@ export class TraceWriter extends common.Service { |
89 | 126 | config); |
90 | 127 |
|
91 | 128 | this.logger = logger; |
92 | | - this.buffer = []; |
| 129 | + this.buffer = new TraceBuffer(); |
93 | 130 | this.defaultLabels = {}; |
94 | 131 |
|
95 | 132 | this.isActive = true; |
@@ -216,54 +253,31 @@ export class TraceWriter extends common.Service { |
216 | 253 | } |
217 | 254 |
|
218 | 255 | /** |
219 | | - * Ensures that all sub spans of the provided Trace object are |
220 | | - * closed and then queues the span data to be published. |
| 256 | + * Queues a trace to be published. Spans with no end time are excluded. |
221 | 257 | * |
222 | 258 | * @param trace The trace to be queued. |
223 | 259 | */ |
224 | 260 | writeTrace(trace: Trace) { |
225 | | - for (const span of trace.spans) { |
226 | | - if (span.endTime === '') { |
227 | | - span.endTime = (new Date()).toISOString(); |
228 | | - } |
229 | | - } |
| 261 | + const publishableSpans = trace.spans.filter(span => !!span.endTime); |
230 | 262 |
|
231 | | - trace.spans.forEach(spanData => { |
| 263 | + publishableSpans.forEach(spanData => { |
232 | 264 | if (spanData.kind === SpanKind.RPC_SERVER) { |
233 | 265 | // Copy properties from the default labels. |
234 | 266 | Object.assign(spanData.labels, this.defaultLabels); |
235 | 267 | } |
236 | 268 | }); |
237 | 269 |
|
238 | | - const afterProjectId = (projectId: string) => { |
239 | | - trace.projectId = projectId; |
240 | | - this.buffer.push(JSON.stringify(trace)); |
241 | | - this.logger.info( |
242 | | - `TraceWriter#writeTrace: buffer.size = ${this.buffer.length}`); |
243 | | - |
244 | | - // Publish soon if the buffer is getting big |
245 | | - if (this.buffer.length >= this.config.bufferSize) { |
246 | | - this.logger.info( |
247 | | - 'TraceWriter#writeTrace: Trace buffer full, flushing.'); |
248 | | - setImmediate(() => this.flushBuffer()); |
249 | | - } |
250 | | - }; |
251 | | - |
252 | | - // TODO(kjin): We should always be following the 'else' path. |
253 | | - // Any test that doesn't mock the Trace Writer will assume that traces get |
254 | | - // buffered synchronously. We need to refactor those tests to remove that |
255 | | - // assumption before we can make this fix. |
256 | | - if (this.projectId !== NO_PROJECT_ID_TOKEN) { |
257 | | - afterProjectId(this.projectId); |
258 | | - } else { |
259 | | - this.getProjectId().then(afterProjectId, (err: Error) => { |
260 | | - // Because failing to get a project ID means that the trace agent will |
261 | | - // get disabled, there is a very small window for this code path to be |
262 | | - // taken. For this reason we don't do anything more complex than just |
263 | | - // notifying that we are dropping the current trace. |
264 | | - this.logger.info( |
265 | | - 'TraceWriter#queueTrace: No project ID, dropping trace.'); |
266 | | - }); |
| 270 | + this.buffer.add({ |
| 271 | + traceId: trace.traceId, |
| 272 | + projectId: trace.projectId, |
| 273 | + spans: publishableSpans |
| 274 | + }); |
| 275 | + this.logger.info(`TraceWriter#writeTrace: number of buffered spans = ${ |
| 276 | + this.buffer.getNumSpans()}`); |
| 277 | + // Publish soon if the buffer is getting big |
| 278 | + if (this.buffer.getNumSpans() >= this.config.bufferSize) { |
| 279 | + this.logger.info('TraceWriter#writeTrace: Trace buffer full, flushing.'); |
| 280 | + setImmediate(() => this.flushBuffer()); |
267 | 281 | } |
268 | 282 | } |
269 | 283 |
|
@@ -292,15 +306,35 @@ export class TraceWriter extends common.Service { |
292 | 306 | * Serializes the buffered traces to be published asynchronously. |
293 | 307 | */ |
294 | 308 | private flushBuffer() { |
295 | | - if (this.buffer.length === 0) { |
| 309 | + // Privatize and clear the buffer. |
| 310 | + const flushedTraces = this.buffer.drain(); |
| 311 | + if (flushedTraces.length === 0) { |
296 | 312 | return; |
297 | 313 | } |
298 | 314 |
|
299 | | - // Privatize and clear the buffer. |
300 | | - const buffer = this.buffer; |
301 | | - this.buffer = []; |
302 | | - this.logger.debug('TraceWriter#flushBuffer: Flushing traces', buffer); |
303 | | - this.publish(`{"traces":[${buffer.join()}]}`); |
| 315 | + const afterProjectId = (projectId: string) => { |
| 316 | + flushedTraces.forEach(trace => trace.projectId = projectId); |
| 317 | + this.logger.debug( |
| 318 | + 'TraceWriter#flushBuffer: Flushing traces', flushedTraces); |
| 319 | + this.publish(JSON.stringify({traces: flushedTraces})); |
| 320 | + }; |
| 321 | + |
| 322 | + // TODO(kjin): We should always be following the 'else' path. |
| 323 | + // Any test that doesn't mock the Trace Writer will assume that traces get |
| 324 | + // buffered synchronously. We need to refactor those tests to remove that |
| 325 | + // assumption before we can make this fix. |
| 326 | + if (this.projectId !== NO_PROJECT_ID_TOKEN) { |
| 327 | + afterProjectId(this.projectId); |
| 328 | + } else { |
| 329 | + this.getProjectId().then(afterProjectId, (err: Error) => { |
| 330 | + // Because failing to get a project ID means that the trace agent will |
| 331 | + // get disabled, there is a very small window for this code path to be |
| 332 | + // taken. For this reason we don't do anything more complex than just |
| 333 | + // notifying that we are dropping the current traces. |
| 334 | + this.logger.info( |
| 335 | + 'TraceWriter#flushBuffer: No project ID, dropping traces.'); |
| 336 | + }); |
| 337 | + } |
304 | 338 | } |
305 | 339 |
|
306 | 340 | /** |
|
0 commit comments