1818
1919import static com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
2020
21+ import com .google .api .gax .core .FixedCredentialsProvider ;
2122import com .google .api .gax .longrunning .OperationFuture ;
2223import com .google .api .gax .paging .Page ;
2324import com .google .api .gax .retrying .RetrySettings ;
7071import com .google .cloud .spanner .TimestampBound ;
7172import com .google .cloud .spanner .TransactionContext ;
7273import com .google .cloud .spanner .TransactionRunner ;
74+ import com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
7375import com .google .cloud .spanner .Type ;
7476import com .google .cloud .spanner .Value ;
7577import com .google .cloud .spanner .encryption .CustomerManagedEncryption ;
7678import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
79+ import com .google .cloud .trace .v1 .TraceServiceClient ;
80+ import com .google .cloud .trace .v1 .TraceServiceSettings ;
7781import com .google .common .base .Function ;
7882import com .google .common .base .Joiner ;
7983import com .google .common .base .Preconditions ;
8084import com .google .common .collect .Lists ;
8185import com .google .common .util .concurrent .ThreadFactoryBuilder ;
86+ import com .google .devtools .cloudtrace .v1 .GetTraceRequest ;
87+ import com .google .devtools .cloudtrace .v1 .Trace ;
88+ import com .google .devtools .cloudtrace .v1 .TraceSpan ;
8289import com .google .longrunning .Operation ;
8390import com .google .protobuf .ByteString ;
8491import com .google .protobuf .util .Timestamps ;
152159import com .google .spanner .v1 .TypeCode ;
153160import io .grpc .Status ;
154161import io .grpc .stub .StreamObserver ;
162+ import io .opentelemetry .api .trace .Span ;
163+ import io .opentelemetry .api .trace .Tracer ;
164+ import io .opentelemetry .context .Scope ;
155165import java .io .ByteArrayInputStream ;
156166import java .io .File ;
157167import java .io .IOException ;
@@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception {
332342 // Try to commit
333343 return null ;
334344 };
345+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
335346 Runnable runnable =
336- () -> {
337- try {
338- runner =
339- optimistic
340- ? dbClient .readWriteTransaction (Options .optimisticLock ())
341- : dbClient .readWriteTransaction ();
342- LOGGER .log (Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
343- runner .run (callable );
344- transactionSucceeded (runner .getCommitTimestamp ().toProto ());
345- } catch (SpannerException e ) {
346- LOGGER .log (
347- Level .WARNING ,
348- String .format ("Transaction runnable failed with exception %s\n " , e .getMessage ()),
349- e );
350- transactionFailed (e );
351- }
352- };
347+ context .wrap (
348+ () -> {
349+ try {
350+ runner =
351+ optimistic
352+ ? dbClient .readWriteTransaction (Options .optimisticLock ())
353+ : dbClient .readWriteTransaction ();
354+ LOGGER .log (
355+ Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
356+ runner .run (callable );
357+ transactionSucceeded (runner .getCommitTimestamp ().toProto ());
358+ } catch (SpannerException e ) {
359+ LOGGER .log (
360+ Level .WARNING ,
361+ String .format (
362+ "Transaction runnable failed with exception %s\n " , e .getMessage ()),
363+ e );
364+ transactionFailed (e );
365+ }
366+ });
353367 LOGGER .log (
354368 Level .INFO ,
355369 String .format ("Callable and Runnable created, ready to execute %s\n " , transactionSeed ));
@@ -815,6 +829,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
815829 .setHost (HOST_PREFIX + WorkerProxy .spannerPort )
816830 .setCredentials (credentials )
817831 .setChannelProvider (channelProvider )
832+ .setEnableServerSideTracing (true )
833+ .setOpenTelemetry (WorkerProxy .openTelemetrySdk )
818834 .setSessionPoolOption (sessionPoolOptions );
819835
820836 SpannerStubSettings .Builder stubSettingsBuilder =
@@ -838,6 +854,70 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
838854 return optionsBuilder .build ().getService ();
839855 }
840856
857+ private TraceServiceClient traceServiceClient ;
858+
859+ // Return the trace service client, create one if not exists.
860+ private synchronized TraceServiceClient getTraceServiceClient () throws IOException {
861+ if (traceServiceClient != null ) {
862+ return traceServiceClient ;
863+ }
864+ // Create a trace service client
865+ Credentials credentials ;
866+ if (WorkerProxy .serviceKeyFile .isEmpty ()) {
867+ credentials = NoCredentials .getInstance ();
868+ } else {
869+ credentials =
870+ GoogleCredentials .fromStream (
871+ new ByteArrayInputStream (
872+ FileUtils .readFileToByteArray (new File (WorkerProxy .serviceKeyFile ))),
873+ HTTP_TRANSPORT_FACTORY );
874+ }
875+
876+ TransportChannelProvider transportChannelProvider =
877+ CloudUtil .newCloudTraceChannelProviderHelper (WorkerProxy .CLOUD_TRACE_ENDPOINT , 443 );
878+
879+ TraceServiceSettings traceServiceSettings =
880+ TraceServiceSettings .newBuilder ()
881+ .setEndpoint (WorkerProxy .CLOUD_TRACE_ENDPOINT )
882+ .setCredentialsProvider (FixedCredentialsProvider .create (credentials ))
883+ .setTransportChannelProvider (transportChannelProvider )
884+ .build ();
885+
886+ traceServiceClient = TraceServiceClient .create (traceServiceSettings );
887+ return traceServiceClient ;
888+ }
889+
890+ private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction" ;
891+ private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction" ;
892+
893+ /* Handles verification of OpenTelemetry traces for server side tracing feature. */
894+ public boolean verifyExportedEndToEndTrace (String traceId ) {
895+ try {
896+ GetTraceRequest getTraceRequest =
897+ GetTraceRequest .newBuilder ()
898+ .setProjectId (WorkerProxy .PROJECT_ID )
899+ .setTraceId (traceId )
900+ .build ();
901+ Trace trace = getTraceServiceClient ().getTrace (getTraceRequest );
902+ boolean readWriteorReadOnlyTxnPresent = false , spannerServerSideSpanPresent = false ;
903+ for (TraceSpan span : trace .getSpansList ()) {
904+ if (span .getName () == READ_ONLY_TRANSACTION || span .getName () == READ_WRITE_TRANSACTION ) {
905+ readWriteorReadOnlyTxnPresent = true ;
906+ }
907+ if (span .getName ().startsWith ("Spanner." )) {
908+ spannerServerSideSpanPresent = true ;
909+ }
910+ }
911+ if (readWriteorReadOnlyTxnPresent && !spannerServerSideSpanPresent ) {
912+ return false ;
913+ }
914+ } catch (IOException e ) {
915+ LOGGER .log (Level .WARNING , "failed to verify end to end traces." , e );
916+ return false ;
917+ }
918+ return true ;
919+ }
920+
841921 /** Handle actions. */
842922 public Status startHandlingRequest (
843923 SpannerAsyncActionRequest req , ExecutionFlowContext executionContext ) {
@@ -862,17 +942,20 @@ public Status startHandlingRequest(
862942 useMultiplexedSession = false ;
863943 }
864944
945+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
865946 actionThreadPool .execute (
866- () -> {
867- Status status =
868- executeAction (outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
869- if (!status .isOk ()) {
870- LOGGER .log (
871- Level .WARNING ,
872- String .format ("Failed to execute action with error: %s\n %s" , status , action ));
873- executionContext .onError (status .getCause ());
874- }
875- });
947+ context .wrap (
948+ () -> {
949+ Status status =
950+ executeAction (
951+ outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
952+ if (!status .isOk ()) {
953+ LOGGER .log (
954+ Level .WARNING ,
955+ String .format ("Failed to execute action with error: %s\n %s" , status , action ));
956+ executionContext .onError (status .getCause ());
957+ }
958+ }));
876959 return Status .OK ;
877960 }
878961
@@ -883,7 +966,12 @@ private Status executeAction(
883966 String dbPath ,
884967 boolean useMultiplexedSession ,
885968 ExecutionFlowContext executionContext ) {
886-
969+ Tracer tracer =
970+ WorkerProxy .openTelemetrySdk .getTracer (CloudClientExecutor .class .getName (), "0.1.0" );
971+ String spanName = String .format ("performaction_%s" , action .getActionCase ().toString ());
972+ LOGGER .log (Level .INFO , String .format ("spanName: %s" , spanName ));
973+ Span span = tracer .spanBuilder (spanName ).startSpan ();
974+ Scope scope = span .makeCurrent ();
887975 try {
888976 if (action .hasAdmin ()) {
889977 return executeAdminAction (useMultiplexedSession , action .getAdmin (), outcomeSender );
@@ -956,11 +1044,15 @@ private Status executeAction(
9561044 ErrorCode .UNIMPLEMENTED , "Not implemented yet: \n " + action )));
9571045 }
9581046 } catch (Exception e ) {
1047+ span .recordException (e );
9591048 LOGGER .log (Level .WARNING , "Unexpected error: " + e .getMessage ());
9601049 return outcomeSender .finishWithError (
9611050 toStatus (
9621051 SpannerExceptionFactory .newSpannerException (
9631052 ErrorCode .INVALID_ARGUMENT , "Unexpected error: " + e .getMessage ())));
1053+ } finally {
1054+ scope .close ();
1055+ span .end ();
9641056 }
9651057 }
9661058
0 commit comments