1818import com .google .api .core .InternalApi ;
1919import com .google .api .gax .retrying .ExponentialRetryAlgorithm ;
2020import com .google .api .gax .retrying .RetryAlgorithm ;
21- import com .google .api .gax .retrying .RetryingExecutor ;
21+ import com .google .api .gax .retrying .RetryingExecutorWithContext ;
2222import com .google .api .gax .retrying .ScheduledRetryingExecutor ;
2323import com .google .api .gax .rpc .BatchingCallSettings ;
2424import com .google .api .gax .rpc .Callables ;
2525import com .google .api .gax .rpc .ClientContext ;
2626import com .google .api .gax .rpc .ServerStreamingCallSettings ;
2727import com .google .api .gax .rpc .ServerStreamingCallable ;
2828import com .google .api .gax .rpc .UnaryCallable ;
29+ import com .google .api .gax .tracing .SpanName ;
30+ import com .google .api .gax .tracing .TracedBatchingCallable ;
31+ import com .google .api .gax .tracing .TracedServerStreamingCallable ;
32+ import com .google .api .gax .tracing .TracedUnaryCallable ;
2933import com .google .bigtable .v2 .MutateRowsRequest ;
3034import com .google .bigtable .v2 .ReadRowsRequest ;
3135import com .google .bigtable .v2 .SampleRowKeysRequest ;
5054import com .google .cloud .bigtable .data .v2 .stub .readrows .ReadRowsUserCallable ;
5155import com .google .cloud .bigtable .data .v2 .stub .readrows .RowMergingCallable ;
5256import com .google .cloud .bigtable .gaxx .retrying .ApiResultRetryAlgorithm ;
57+ import com .google .cloud .bigtable .gaxx .tracing .WrappedTracerFactory ;
5358import java .io .IOException ;
5459import java .util .List ;
5560import org .threeten .bp .Duration ;
6873 */
6974@ InternalApi
7075public class EnhancedBigtableStub implements AutoCloseable {
76+ private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable" ;
77+ private static final String TRACING_INNER_CLIENT_NAME = "BaseBigtable" ;
78+
7179 private final EnhancedBigtableStubSettings settings ;
7280 private final GrpcBigtableStub stub ;
7381 private final ClientContext clientContext ;
@@ -92,7 +100,10 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
92100 .setCredentialsProvider (settings .getCredentialsProvider ())
93101 .setHeaderProvider (settings .getHeaderProvider ())
94102 .setStreamWatchdogProvider (settings .getStreamWatchdogProvider ())
95- .setStreamWatchdogCheckInterval (settings .getStreamWatchdogCheckInterval ());
103+ .setStreamWatchdogCheckInterval (settings .getStreamWatchdogCheckInterval ())
104+ // Force the base stub to use a different TracerFactory
105+ .setTracerFactory (
106+ new WrappedTracerFactory (settings .getTracerFactory (), TRACING_INNER_CLIENT_NAME ));
96107
97108 // ReadRow retries are handled in the overlay: disable retries in the base layer (but make
98109 // sure to preserve the exception callable settings).
@@ -140,6 +151,9 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
140151 ClientContext clientContext = ClientContext .create (baseSettings );
141152 GrpcBigtableStub stub = new GrpcBigtableStub (baseSettings , clientContext );
142153
154+ // Make sure to keep the original tracer factory for the outer client.
155+ clientContext = clientContext .toBuilder ().setTracerFactory (settings .getTracerFactory ()).build ();
156+
143157 return new EnhancedBigtableStub (settings , clientContext , stub );
144158 }
145159
@@ -247,15 +261,8 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
247261 FilterMarkerRowsCallable <RowT > filtering =
248262 new FilterMarkerRowsCallable <>(retrying2 , rowAdapter );
249263
250- ServerStreamingCallable <ReadRowsRequest , RowT > withContext =
251- filtering .withDefaultCallContext (clientContext .getDefaultCallContext ());
252-
253- // NOTE: Ideally `withDefaultCallContext` should be the outer-most callable, however the
254- // ReadRowsUserCallable overrides the first() method. This override would be lost if
255- // ReadRowsUserCallable is wrapped by another callable. At some point in the future,
256- // gax-java should allow preserving these kind of overrides through callable chains, at which
257- // point this should be re-ordered.
258- return new ReadRowsUserCallable <>(withContext , requestContext );
264+ return createUserFacingServerStreamingCallable (
265+ "ReadRows" , new ReadRowsUserCallable <>(filtering , requestContext ));
259266 }
260267
261268 /**
@@ -276,10 +283,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
276283 UnaryCallable <SampleRowKeysRequest , List <SampleRowKeysResponse >> retryable =
277284 Callables .retrying (spoolable , settings .sampleRowKeysSettings (), clientContext );
278285
279- UnaryCallable <String , List <KeyOffset >> userFacing =
280- new SampleRowKeysCallable (retryable , requestContext );
281-
282- return userFacing .withDefaultCallContext (clientContext .getDefaultCallContext ());
286+ return createUserFacingUnaryCallable (
287+ "SampleRowKeys" , new SampleRowKeysCallable (retryable , requestContext ));
283288 }
284289
285290 /**
@@ -290,9 +295,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
290295 * </ul>
291296 */
292297 private UnaryCallable <RowMutation , Void > createMutateRowCallable () {
293- MutateRowCallable userFacing = new MutateRowCallable (stub .mutateRowCallable (), requestContext );
294-
295- return userFacing .withDefaultCallContext (clientContext .getDefaultCallContext ());
298+ return createUserFacingUnaryCallable (
299+ "MutateRow" , new MutateRowCallable (stub .mutateRowCallable (), requestContext ));
296300 }
297301
298302 /**
@@ -311,7 +315,9 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
311315 */
312316 private UnaryCallable <BulkMutation , Void > createBulkMutateRowsCallable () {
313317 UnaryCallable <MutateRowsRequest , Void > baseCallable = createMutateRowsBaseCallable ();
314- return new BulkMutateRowsUserFacingCallable (baseCallable , requestContext );
318+
319+ return createUserFacingUnaryCallable (
320+ "BulkMutateRows" , new BulkMutateRowsUserFacingCallable (baseCallable , requestContext ));
315321 }
316322
317323 /**
@@ -338,8 +344,17 @@ private UnaryCallable<RowMutation, Void> createBulkMutateRowsBatchingCallable()
338344 BatchingCallSettings .newBuilder (new MutateRowsBatchingDescriptor ())
339345 .setBatchingSettings (settings .bulkMutateRowsSettings ().getBatchingSettings ());
340346
347+ // This is a special case, the tracing starts after the batching, so we can't use
348+ // createUserFacingUnaryCallable
349+ TracedBatchingCallable <MutateRowsRequest , Void > traced =
350+ new TracedBatchingCallable <>(
351+ baseCallable ,
352+ clientContext .getTracerFactory (),
353+ SpanName .of (TRACING_OUTER_CLIENT_NAME , "BulkMutateRows" ),
354+ batchingCallSettings .getBatchingDescriptor ());
355+
341356 UnaryCallable <MutateRowsRequest , Void > batching =
342- Callables .batching (baseCallable , batchingCallSettings .build (), clientContext );
357+ Callables .batching (traced , batchingCallSettings .build (), clientContext );
343358
344359 MutateRowsUserFacingCallable userFacing =
345360 new MutateRowsUserFacingCallable (batching , requestContext );
@@ -359,7 +374,7 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
359374 new ApiResultRetryAlgorithm <Void >(),
360375 new ExponentialRetryAlgorithm (
361376 settings .bulkMutateRowsSettings ().getRetrySettings (), clientContext .getClock ()));
362- RetryingExecutor <Void > retryingExecutor =
377+ RetryingExecutorWithContext <Void > retryingExecutor =
363378 new ScheduledRetryingExecutor <>(retryAlgorithm , clientContext .getExecutor ());
364379
365380 return new MutateRowsRetryingCallable (
@@ -378,10 +393,9 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
378393 * </ul>
379394 */
380395 private UnaryCallable <ConditionalRowMutation , Boolean > createCheckAndMutateRowCallable () {
381- CheckAndMutateRowCallable userFacing =
382- new CheckAndMutateRowCallable (stub .checkAndMutateRowCallable (), requestContext );
383-
384- return userFacing .withDefaultCallContext (clientContext .getDefaultCallContext ());
396+ return createUserFacingUnaryCallable (
397+ "CheckAndMutateRow" ,
398+ new CheckAndMutateRowCallable (stub .checkAndMutateRowCallable (), requestContext ));
385399 }
386400
387401 /**
@@ -394,10 +408,42 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
394408 * </ul>
395409 */
396410 private UnaryCallable <ReadModifyWriteRow , Row > createReadModifyWriteRowCallable () {
397- ReadModifyWriteRowCallable userFacing =
398- new ReadModifyWriteRowCallable (stub .readModifyWriteRowCallable (), requestContext );
411+ return createUserFacingUnaryCallable (
412+ "ReadModifyWriteRow" ,
413+ new ReadModifyWriteRowCallable (stub .readModifyWriteRowCallable (), requestContext ));
414+ }
399415
400- return userFacing .withDefaultCallContext (clientContext .getDefaultCallContext ());
416+ /**
417+ * Wraps a callable chain in a user presentable callable that will inject the default call context
418+ * and trace the call.
419+ */
420+ private <RequestT , ResponseT > UnaryCallable <RequestT , ResponseT > createUserFacingUnaryCallable (
421+ String methodName , UnaryCallable <RequestT , ResponseT > inner ) {
422+
423+ UnaryCallable <RequestT , ResponseT > traced =
424+ new TracedUnaryCallable <>(
425+ inner ,
426+ clientContext .getTracerFactory (),
427+ SpanName .of (TRACING_OUTER_CLIENT_NAME , methodName ));
428+
429+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
430+ }
431+
432+ /**
433+ * Wraps a callable chain in a user presentable callable that will inject the default call context
434+ * and trace the call.
435+ */
436+ private <RequestT , ResponseT >
437+ ServerStreamingCallable <RequestT , ResponseT > createUserFacingServerStreamingCallable (
438+ String methodName , ServerStreamingCallable <RequestT , ResponseT > inner ) {
439+
440+ ServerStreamingCallable <RequestT , ResponseT > traced =
441+ new TracedServerStreamingCallable <>(
442+ inner ,
443+ clientContext .getTracerFactory (),
444+ SpanName .of (TRACING_OUTER_CLIENT_NAME , methodName ));
445+
446+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
401447 }
402448 // </editor-fold>
403449
0 commit comments