Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit ca490e8

Browse files
feat: Initial timed stream implementation for application latencies (#1639)
* Add initial tests for the timed stream * Skip the failing test * Add headers * Improve test timeout for kokoro * Add handlers in the transform function * new timed stream code pushed * old timed stream code changes * Include the setTimeout test * skip the test * Add a test for backpressure + delay from server * Remove only * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add some tests for iterating through a stream * Test should not complete straight away * Put the failing iterative test in separate file Add a note for the setTimeout test * Add a test for when a sleep is done using promise * Correct the test - should not include promises * Timed stream separated passes tests * Remove the console logs * Get rid of process.out logs * remove console log * add test flexibility for should measure the total * Cleanup and add a couple more tests * Delete files that are no longer necessary * remove only * omit the console logs * Add clearIntervals * Add comments and move the transform function * Initialize startTimeTransform - not class member * Add header * Rename the timed stream file * Add timeouts for kokoro * Add a comment about iterate vs handler * Do not emit events. Call handle methods directly * Introduce the new stream timer class --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 03b758d commit ca490e8

2 files changed

Lines changed: 457 additions & 0 deletions

File tree

src/timed-stream.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {PassThrough, TransformCallback, TransformOptions} from 'stream';
16+
17+
/**
18+
* This interface is the usual options that can be passed into a Transform plus
19+
* a hook for injecting code into the stream's transform function. This hook is
20+
* useful for code running code that normally runs in the transform method if that
21+
* code is different for each method that makes use of a stream.
22+
*/
23+
type TimedStreamOptions = TransformOptions & {
24+
transformHook?: (
25+
event: any,
26+
_encoding: BufferEncoding,
27+
callback: TransformCallback,
28+
) => void;
29+
};
30+
31+
class StreamTimer {
32+
private startTime;
33+
private totalDuration;
34+
35+
constructor() {
36+
this.startTime = 0n;
37+
this.totalDuration = 0n;
38+
}
39+
40+
getTotalDurationMs() {
41+
return Number(this.totalDuration / 1_000_000n);
42+
}
43+
44+
start() {
45+
this.startTime = process.hrtime.bigint();
46+
}
47+
48+
stop() {
49+
const endTime = process.hrtime.bigint();
50+
const duration = endTime - this.startTime;
51+
this.totalDuration += duration;
52+
}
53+
}
54+
55+
/**
56+
* The TimedStream class is used for measuring the time the user spends
57+
* processing data from the stream. We need to measure this time for use cases
58+
* like measuring the application latencies for client side metrics.
59+
*/
60+
export class TimedStream extends PassThrough {
61+
private readTimer = new StreamTimer();
62+
private transformTimer = new StreamTimer();
63+
constructor(options?: TimedStreamOptions) {
64+
// highWaterMark of 1 is needed to respond to each row
65+
super({
66+
...options,
67+
objectMode: true,
68+
highWaterMark: 0,
69+
transform: (event, _encoding, callback) => {
70+
/* When we iterate through a stream, time spent waiting for the user's
71+
application is added to totalDurationTransform. When we use handlers,
72+
time spent waiting for the user's application is added to
73+
totalDurationTransform. We need two different timers to measure total
74+
application blocking latencies because the streams behave differently
75+
depending on whether the user is iterating through a stream or using
76+
timers.
77+
*/
78+
this.transformTimer.start();
79+
if (options?.transformHook) {
80+
options?.transformHook(event, _encoding, callback);
81+
}
82+
callback(null, event);
83+
this.transformTimer.stop();
84+
},
85+
});
86+
}
87+
88+
/**
89+
* read code is called when a row is consumed.
90+
*/
91+
read(size: number) {
92+
// calculate the time spent between iterations of read (i.e. processing the stream in a for loop)
93+
const chunk = super.read(size);
94+
if (chunk) {
95+
this.readTimer.start();
96+
// Defer the after call to the next tick of the event loop
97+
process.nextTick(() => {
98+
this.readTimer.stop();
99+
});
100+
}
101+
return chunk;
102+
}
103+
104+
/**
105+
* Returns the total amount of time the user code spends handling data.
106+
*/
107+
getTotalDurationMs() {
108+
return (
109+
this.readTimer.getTotalDurationMs() +
110+
this.transformTimer.getTotalDurationMs()
111+
);
112+
}
113+
}

0 commit comments

Comments
 (0)