You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Mar 26, 2026. It is now read-only.
feat: Record ReadRows application latencies for client side metrics (#1647)
## Description
This PR makes it so that all readRows calls will start recording application latencies. With this change, data for application latencies will now be available on the Google Cloud dashboard.
## Impact
Data should now be available for application latencies on ReadRows calls in the client side metrics dashboard.
## Testing
Tests are added that mock out the hrtime module and make sure the right application latencies values are recorded.
* This interface is the usual options that can be passed into a Transform plus
@@ -42,11 +43,11 @@ class StreamTimer {
42
43
}
43
44
44
45
start(){
45
-
this.startTime=process.hrtime.bigint();
46
+
this.startTime=hrtime.bigint();
46
47
}
47
48
48
49
stop(){
49
-
constendTime=process.hrtime.bigint();
50
+
constendTime=hrtime.bigint();
50
51
constduration=endTime-this.startTime;
51
52
this.totalDuration+=duration;
52
53
}
@@ -65,7 +66,8 @@ export class TimedStream extends PassThrough {
65
66
super({
66
67
...options,
67
68
objectMode: true,
68
-
highWaterMark: 0,
69
+
readableHighWaterMark: 0,// We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early.
70
+
writableHighWaterMark: 0,// We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key.
69
71
transform: (event,_encoding,callback)=>{
70
72
/* When we iterate through a stream, time spent waiting for the user's
71
73
application is added to totalDurationTransform. When we use handlers,
@@ -79,7 +81,6 @@ export class TimedStream extends PassThrough {
* Creates a readable stream of rows from a Bigtable table or authorized view.
@@ -128,11 +129,8 @@ export function createReadStreamInternal(
128
129
// discarded in the per attempt subpipeline (rowStream)
129
130
letlastRowKey='';
130
131
letrowsRead=0;
131
-
constuserStream=newPassThrough({
132
-
objectMode: true,
133
-
readableHighWaterMark: 0,// We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early.
134
-
writableHighWaterMark: 0,// We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key.
135
-
transform(event,_encoding,callback){
132
+
constuserStream=newTimedStream({
133
+
transformHook(event,_encoding,callback){
136
134
if(userCanceled){
137
135
callback();
138
136
return;
@@ -373,7 +371,10 @@ export function createReadStreamInternal(
373
371
// We ignore the `cancelled` "error", since we are the ones who cause
374
372
// it when the user calls `.abort()`.
375
373
userStream.end();
376
-
metricsCollector.onOperationComplete(error.code);
374
+
metricsCollector.onOperationComplete(
375
+
error.code,
376
+
userStream.getTotalDurationMs(),
377
+
);
377
378
return;
378
379
}
379
380
numConsecutiveErrors++;
@@ -405,7 +406,10 @@ export function createReadStreamInternal(
405
406
//
406
407
error.code=grpc.status.CANCELLED;
407
408
}
408
-
metricsCollector.onOperationComplete(error.code);
409
+
metricsCollector.onOperationComplete(
410
+
error.code,
411
+
userStream.getTotalDurationMs(),
412
+
);
409
413
userStream.emit('error',error);
410
414
}
411
415
})
@@ -416,7 +420,11 @@ export function createReadStreamInternal(
0 commit comments