Skip to content

Commit 0d197a5

Browse files
authored
feat: add batch throttled ms metric (#888)
* feat: add throttled time to ApiTracer * fix abstract class * update based on review * fix format * updates on comments * make tests more readable * update year * fix test * make the test more readable
1 parent bd873bc commit 0d197a5

11 files changed

Lines changed: 356 additions & 22 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
7878
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
7979
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
80+
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
8081
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
8182
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
8283
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
@@ -88,6 +89,7 @@
8889
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
8990
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
9091
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
92+
import com.google.common.base.MoreObjects;
9193
import com.google.common.base.Preconditions;
9294
import com.google.common.collect.ImmutableList;
9395
import com.google.common.collect.ImmutableMap;
@@ -132,6 +134,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
132134

133135
private final ServerStreamingCallable<Query, Row> readRowsCallable;
134136
private final UnaryCallable<Query, Row> readRowCallable;
137+
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
135138
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
136139
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
137140
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
@@ -267,6 +270,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
267270

268271
readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
269272
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
273+
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
270274
sampleRowKeysCallable = createSampleRowKeysCallable();
271275
mutateRowCallable = createMutateRowCallable();
272276
bulkMutateRowsCallable = createBulkMutateRowsCallable();
@@ -430,6 +434,46 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
430434
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
431435
}
432436

437+
/**
438+
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
439+
* batcher. The chain will:
440+
*
441+
* <ul>
442+
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
443+
* <li>Upon receiving the response stream, it will merge the {@link
444+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
445+
* implementation can be configured in by the {@code rowAdapter} parameter.
446+
* <li>Retry/resume on failure.
447+
* <li>Filter out marker rows.
448+
* <li>Construct a {@link UnaryCallable} that will buffer the entire stream into memory before
449+
* completing. If the stream is empty, then the list will be empty.
450+
* <li>Add tracing & metrics.
451+
* </ul>
452+
*/
453+
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
454+
RowAdapter<RowT> rowAdapter) {
455+
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
456+
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);
457+
458+
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
459+
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
460+
461+
SpanName span = getSpanName("ReadRows");
462+
463+
// The TracedBatcherUnaryCallable has to be wrapped by the TracedUnaryCallable, so that
464+
// TracedUnaryCallable can inject a tracer for the TracedBatcherUnaryCallable to interact with
465+
UnaryCallable<Query, List<RowT>> tracedBatcher =
466+
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());
467+
468+
UnaryCallable<Query, List<RowT>> withHeaderTracer =
469+
new HeaderTracerUnaryCallable(tracedBatcher);
470+
471+
UnaryCallable<Query, List<RowT>> traced =
472+
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
473+
474+
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
475+
}
476+
433477
/**
434478
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
435479
*
@@ -549,8 +593,12 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
549593
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);
550594

551595
SpanName spanName = getSpanName("MutateRows");
596+
597+
UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
598+
552599
UnaryCallable<BulkMutation, Void> withHeaderTracer =
553-
new HeaderTracerUnaryCallable<>(userFacing);
600+
new HeaderTracerUnaryCallable<>(tracedBatcher);
601+
554602
UnaryCallable<BulkMutation, Void> traced =
555603
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);
556604

@@ -578,17 +626,14 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
578626
*/
579627
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
580628
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
581-
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
582-
if (ctx != null) {
583-
callable = callable.withDefaultCallContext(ctx);
584-
}
585629
return new BatcherImpl<>(
586630
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
587-
callable,
631+
bulkMutateRowsCallable,
588632
BulkMutation.create(tableId),
589633
settings.bulkMutateRowsSettings().getBatchingSettings(),
590634
clientContext.getExecutor(),
591-
bulkMutationFlowController);
635+
bulkMutationFlowController,
636+
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
592637
}
593638

594639
/**
@@ -609,16 +654,14 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
609654
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
610655
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
611656
Preconditions.checkNotNull(query, "query cannot be null");
612-
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
613-
if (ctx != null) {
614-
callable = callable.withDefaultCallContext(ctx);
615-
}
616657
return new BatcherImpl<>(
617658
settings.bulkReadRowsSettings().getBatchingDescriptor(),
618-
callable,
659+
bulkReadRowsCallable,
619660
query,
620661
settings.bulkReadRowsSettings().getBatchingSettings(),
621-
clientContext.getExecutor());
662+
clientContext.getExecutor(),
663+
null,
664+
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
622665
}
623666

624667
/**

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,40 @@
2121
import com.google.api.gax.tracing.BaseApiTracer;
2222
import javax.annotation.Nullable;
2323

24-
/** A Bigtable specific {@link ApiTracer} that includes additional contexts. */
24+
/**
25+
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
26+
* implementation that does nothing.
27+
*/
2528
@BetaApi("This surface is stable yet it might be removed in the future.")
26-
public abstract class BigtableTracer extends BaseApiTracer {
29+
public class BigtableTracer extends BaseApiTracer {
30+
31+
private volatile int attempt = 0;
32+
33+
@Override
34+
public void attemptStarted(int attemptNumber) {
35+
this.attempt = attemptNumber;
36+
}
2737

2838
/**
2939
* Get the attempt number of the current call. Attempt number for the current call is passed in
3040
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
3141
* {@link ApiCallContext}. Attempt number starts from 0.
3242
*/
33-
public abstract int getAttempt();
43+
public int getAttempt() {
44+
return attempt;
45+
}
3446

3547
/**
3648
* Record the latency between Google's network receives the RPC and reads back the first byte of
3749
* the response from server-timing header. If server-timing header is missing, increment the
3850
* missing header count.
3951
*/
40-
public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable);
52+
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
53+
// noop
54+
}
55+
56+
/** Adds an annotation of the total throttled time of a batch. */
57+
public void batchRequestThrottled(long throttledTimeMs) {
58+
// noop
59+
}
4160
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
178178
tracer.recordGfeMetadata(latency, throwable);
179179
}
180180
}
181+
182+
@Override
183+
public void batchRequestThrottled(long throttledTimeMs) {
184+
for (BigtableTracer tracer : bigtableTracers) {
185+
tracer.batchRequestThrottled(throttledTimeMs);
186+
}
187+
}
181188
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ public void connectionSelected(String s) {
129129
}
130130

131131
@Override
132-
public void attemptStarted(int i) {
133-
attempt = i;
132+
public void attemptStarted(int attemptNumber) {
133+
attempt = attemptNumber;
134134
attemptCount++;
135135
attemptTimer = Stopwatch.createStarted();
136136
attemptResponseCount = 0;
@@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
226226
.build());
227227
}
228228

229+
@Override
230+
public void batchRequestThrottled(long totalThrottledMs) {
231+
MeasureMap measures =
232+
stats
233+
.newMeasureMap()
234+
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
235+
measures.record(newTagCtxBuilder().build());
236+
}
237+
229238
private TagContextBuilder newTagCtxBuilder() {
230239
TagContextBuilder tagCtx =
231240
tagger

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,11 @@ public class RpcMeasureConstants {
8888
"cloud.google.com/java/bigtable/gfe_header_missing_count",
8989
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
9090
COUNT);
91+
92+
/** Total throttled time of a batch in msecs. */
93+
public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
94+
MeasureLong.create(
95+
"cloud.google.com/java/bigtable/batch_throttled_time",
96+
"Total throttled time of a batch in msecs",
97+
MILLISECOND);
9198
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
1919
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
20+
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
2021
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
2122
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
2223
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
@@ -154,4 +155,14 @@ class RpcViewConstants {
154155
BIGTABLE_APP_PROFILE_ID,
155156
BIGTABLE_OP,
156157
BIGTABLE_STATUS));
158+
159+
// use distribution so we can correlate batch throttled time with op_latency
160+
static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
161+
View.create(
162+
View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
163+
"Total throttled time of a batch in msecs",
164+
BIGTABLE_BATCH_THROTTLED_TIME,
165+
AGGREGATION_WITH_MILLIS_HISTOGRAM,
166+
ImmutableList.of(
167+
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
157168
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ public class RpcViews {
3131
RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
3232
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
3333
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
34-
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
34+
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
35+
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);
3536

3637
private static final ImmutableSet<View> GFE_VIEW_SET =
3738
ImmutableSet.of(
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.InternalApi;
20+
import com.google.api.gax.batching.Batcher;
21+
import com.google.api.gax.rpc.ApiCallContext;
22+
import com.google.api.gax.rpc.UnaryCallable;
23+
import com.google.api.gax.tracing.ApiTracer;
24+
25+
/**
26+
* This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link
27+
* ApiTracer}. This class needs to be wrapped by a callable that injects the {@link ApiTracer}.
28+
*/
29+
@InternalApi
30+
public final class TracedBatcherUnaryCallable<RequestT, ResponseT>
31+
extends UnaryCallable<RequestT, ResponseT> {
32+
private final UnaryCallable<RequestT, ResponseT> innerCallable;
33+
34+
public TracedBatcherUnaryCallable(UnaryCallable innerCallable) {
35+
this.innerCallable = innerCallable;
36+
}
37+
38+
@Override
39+
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
40+
if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) {
41+
ApiTracer tracer = context.getTracer();
42+
// this should always be true
43+
if (tracer instanceof BigtableTracer) {
44+
((BigtableTracer) tracer)
45+
.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY));
46+
}
47+
}
48+
return innerCallable.futureCall(request, context);
49+
}
50+
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub.metrics;
1717

18+
import static com.google.common.truth.Truth.assertThat;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.times;
2021
import static org.mockito.Mockito.verify;
2122
import static org.mockito.Mockito.when;
2223

2324
import com.google.api.gax.tracing.ApiTracer;
2425
import com.google.api.gax.tracing.ApiTracer.Scope;
26+
import com.google.cloud.bigtable.misc_utilities.MethodComparator;
2527
import com.google.common.collect.ImmutableList;
2628
import io.grpc.Status;
2729
import io.grpc.StatusRuntimeException;
30+
import java.lang.reflect.Method;
31+
import java.util.Arrays;
2832
import org.junit.Assert;
2933
import org.junit.Before;
3034
import org.junit.Rule;
@@ -229,4 +233,20 @@ public void testRecordGfeLatency() {
229233
verify(child3, times(1)).recordGfeMetadata(20L, t);
230234
verify(child4, times(1)).recordGfeMetadata(20L, t);
231235
}
236+
237+
@Test
238+
public void testBatchRequestThrottled() {
239+
compositeTracer.batchRequestThrottled(5L);
240+
verify(child3, times(1)).batchRequestThrottled(5L);
241+
verify(child4, times(1)).batchRequestThrottled(5L);
242+
}
243+
244+
@Test
245+
public void testMethodsOverride() {
246+
Method[] baseMethods = BigtableTracer.class.getDeclaredMethods();
247+
Method[] compositeTracerMethods = CompositeTracer.class.getDeclaredMethods();
248+
assertThat(Arrays.asList(compositeTracerMethods))
249+
.comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE)
250+
.containsAtLeastElementsIn(baseMethods);
251+
}
232252
}

0 commit comments

Comments
 (0)