Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit fd42e1d

Browse files
committed
Add changes in Spanner executor for testing end to end tracing
1 parent 0004919 commit fd42e1d

5 files changed

Lines changed: 298 additions & 30 deletions

File tree

google-cloud-spanner-executor/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,41 @@
2121
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2222
</properties>
2323

24+
<dependencyManagement>
25+
<dependencies>
26+
<dependency>
27+
<groupId>io.opentelemetry</groupId>
28+
<artifactId>opentelemetry-bom</artifactId>
29+
<version>1.41.0</version>
30+
<type>pom</type>
31+
<scope>import</scope>
32+
</dependency>
33+
</dependencies>
34+
</dependencyManagement>
35+
2436
<dependencies>
37+
<dependency>
38+
<groupId>io.opentelemetry</groupId>
39+
<artifactId>opentelemetry-api</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>io.opentelemetry</groupId>
43+
<artifactId>opentelemetry-sdk</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>com.google.cloud.opentelemetry</groupId>
47+
<artifactId>exporter-trace</artifactId>
48+
<version>0.29.0</version>
49+
</dependency>
2550
<dependency>
2651
<groupId>com.google.cloud</groupId>
2752
<artifactId>google-cloud-spanner</artifactId>
2853
</dependency>
54+
<dependency>
55+
<groupId>com.google.cloud</groupId>
56+
<artifactId>google-cloud-trace</artifactId>
57+
<version>2.47.0</version>
58+
</dependency>
2959
<dependency>
3060
<groupId>io.grpc</groupId>
3161
<artifactId>grpc-api</artifactId>

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java

Lines changed: 120 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;
2020

21+
import com.google.api.gax.core.FixedCredentialsProvider;
2122
import com.google.api.gax.longrunning.OperationFuture;
2223
import com.google.api.gax.paging.Page;
2324
import com.google.api.gax.retrying.RetrySettings;
@@ -70,15 +71,21 @@
7071
import com.google.cloud.spanner.TimestampBound;
7172
import com.google.cloud.spanner.TransactionContext;
7273
import com.google.cloud.spanner.TransactionRunner;
74+
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
7375
import com.google.cloud.spanner.Type;
7476
import com.google.cloud.spanner.Value;
7577
import com.google.cloud.spanner.encryption.CustomerManagedEncryption;
7678
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
79+
import com.google.cloud.trace.v1.TraceServiceClient;
80+
import com.google.cloud.trace.v1.TraceServiceSettings;
7781
import com.google.common.base.Function;
7882
import com.google.common.base.Joiner;
7983
import com.google.common.base.Preconditions;
8084
import com.google.common.collect.Lists;
8185
import com.google.common.util.concurrent.ThreadFactoryBuilder;
86+
import com.google.devtools.cloudtrace.v1.GetTraceRequest;
87+
import com.google.devtools.cloudtrace.v1.Trace;
88+
import com.google.devtools.cloudtrace.v1.TraceSpan;
8289
import com.google.longrunning.Operation;
8390
import com.google.protobuf.ByteString;
8491
import com.google.protobuf.util.Timestamps;
@@ -152,6 +159,9 @@
152159
import com.google.spanner.v1.TypeCode;
153160
import io.grpc.Status;
154161
import io.grpc.stub.StreamObserver;
162+
import io.opentelemetry.api.trace.Span;
163+
import io.opentelemetry.api.trace.Tracer;
164+
import io.opentelemetry.context.Scope;
155165
import java.io.ByteArrayInputStream;
156166
import java.io.File;
157167
import java.io.IOException;
@@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception {
332342
// Try to commit
333343
return null;
334344
};
345+
io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
335346
Runnable runnable =
336-
() -> {
337-
try {
338-
runner =
339-
optimistic
340-
? dbClient.readWriteTransaction(Options.optimisticLock())
341-
: dbClient.readWriteTransaction();
342-
LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
343-
runner.run(callable);
344-
transactionSucceeded(runner.getCommitTimestamp().toProto());
345-
} catch (SpannerException e) {
346-
LOGGER.log(
347-
Level.WARNING,
348-
String.format("Transaction runnable failed with exception %s\n", e.getMessage()),
349-
e);
350-
transactionFailed(e);
351-
}
352-
};
347+
context.wrap(
348+
() -> {
349+
try {
350+
runner =
351+
optimistic
352+
? dbClient.readWriteTransaction(Options.optimisticLock())
353+
: dbClient.readWriteTransaction();
354+
LOGGER.log(
355+
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
356+
runner.run(callable);
357+
transactionSucceeded(runner.getCommitTimestamp().toProto());
358+
} catch (SpannerException e) {
359+
LOGGER.log(
360+
Level.WARNING,
361+
String.format(
362+
"Transaction runnable failed with exception %s\n", e.getMessage()),
363+
e);
364+
transactionFailed(e);
365+
}
366+
});
353367
LOGGER.log(
354368
Level.INFO,
355369
String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed));
@@ -815,6 +829,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
815829
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
816830
.setCredentials(credentials)
817831
.setChannelProvider(channelProvider)
832+
.setEnableServerSideTracing(true)
833+
.setOpenTelemetry(WorkerProxy.openTelemetrySdk)
818834
.setSessionPoolOption(sessionPoolOptions);
819835

820836
SpannerStubSettings.Builder stubSettingsBuilder =
@@ -838,6 +854,70 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
838854
return optionsBuilder.build().getService();
839855
}
840856

857+
private TraceServiceClient traceServiceClient;
858+
859+
// Return the trace service client, create one if not exists.
860+
private synchronized TraceServiceClient getTraceServiceClient() throws IOException {
861+
if (traceServiceClient != null) {
862+
return traceServiceClient;
863+
}
864+
// Create a trace service client
865+
Credentials credentials;
866+
if (WorkerProxy.serviceKeyFile.isEmpty()) {
867+
credentials = NoCredentials.getInstance();
868+
} else {
869+
credentials =
870+
GoogleCredentials.fromStream(
871+
new ByteArrayInputStream(
872+
FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
873+
HTTP_TRANSPORT_FACTORY);
874+
}
875+
876+
TransportChannelProvider transportChannelProvider =
877+
CloudUtil.newCloudTraceChannelProviderHelper(WorkerProxy.CLOUD_TRACE_ENDPOINT, 443);
878+
879+
TraceServiceSettings traceServiceSettings =
880+
TraceServiceSettings.newBuilder()
881+
.setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT)
882+
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
883+
.setTransportChannelProvider(transportChannelProvider)
884+
.build();
885+
886+
traceServiceClient = TraceServiceClient.create(traceServiceSettings);
887+
return traceServiceClient;
888+
}
889+
890+
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
891+
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
892+
893+
/* Handles verification of OpenTelemetry traces for server side tracing feature. */
894+
public boolean verifyExportedEndToEndTrace(String traceId) {
895+
try {
896+
GetTraceRequest getTraceRequest =
897+
GetTraceRequest.newBuilder()
898+
.setProjectId(WorkerProxy.PROJECT_ID)
899+
.setTraceId(traceId)
900+
.build();
901+
Trace trace = getTraceServiceClient().getTrace(getTraceRequest);
902+
boolean readWriteorReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false;
903+
for (TraceSpan span : trace.getSpansList()) {
904+
if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) {
905+
readWriteorReadOnlyTxnPresent = true;
906+
}
907+
if (span.getName().startsWith("Spanner.")) {
908+
spannerServerSideSpanPresent = true;
909+
}
910+
}
911+
if (readWriteorReadOnlyTxnPresent && !spannerServerSideSpanPresent) {
912+
return false;
913+
}
914+
} catch (IOException e) {
915+
LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e);
916+
return false;
917+
}
918+
return true;
919+
}
920+
841921
/** Handle actions. */
842922
public Status startHandlingRequest(
843923
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
@@ -862,17 +942,20 @@ public Status startHandlingRequest(
862942
useMultiplexedSession = false;
863943
}
864944

945+
io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
865946
actionThreadPool.execute(
866-
() -> {
867-
Status status =
868-
executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
869-
if (!status.isOk()) {
870-
LOGGER.log(
871-
Level.WARNING,
872-
String.format("Failed to execute action with error: %s\n%s", status, action));
873-
executionContext.onError(status.getCause());
874-
}
875-
});
947+
context.wrap(
948+
() -> {
949+
Status status =
950+
executeAction(
951+
outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
952+
if (!status.isOk()) {
953+
LOGGER.log(
954+
Level.WARNING,
955+
String.format("Failed to execute action with error: %s\n%s", status, action));
956+
executionContext.onError(status.getCause());
957+
}
958+
}));
876959
return Status.OK;
877960
}
878961

@@ -883,7 +966,12 @@ private Status executeAction(
883966
String dbPath,
884967
boolean useMultiplexedSession,
885968
ExecutionFlowContext executionContext) {
886-
969+
Tracer tracer =
970+
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
971+
String spanName = String.format("performaction_%s", action.getActionCase().toString());
972+
LOGGER.log(Level.INFO, String.format("spanName: %s", spanName));
973+
Span span = tracer.spanBuilder(spanName).startSpan();
974+
Scope scope = span.makeCurrent();
887975
try {
888976
if (action.hasAdmin()) {
889977
return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender);
@@ -956,11 +1044,15 @@ private Status executeAction(
9561044
ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action)));
9571045
}
9581046
} catch (Exception e) {
1047+
span.recordException(e);
9591048
LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage());
9601049
return outcomeSender.finishWithError(
9611050
toStatus(
9621051
SpannerExceptionFactory.newSpannerException(
9631052
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
1053+
} finally {
1054+
scope.close();
1055+
span.end();
9641056
}
9651057
}
9661058

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import com.google.spanner.executor.v1.SpannerOptions;
2727
import io.grpc.Status;
2828
import io.grpc.stub.StreamObserver;
29+
import io.opentelemetry.api.trace.Span;
30+
import io.opentelemetry.api.trace.Tracer;
31+
import io.opentelemetry.context.Scope;
32+
import java.util.concurrent.atomic.AtomicBoolean;
2933
import java.util.logging.Level;
3034
import java.util.logging.Logger;
3135

@@ -40,19 +44,37 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
4044
// Ratio of operations to use multiplexed sessions.
4145
private final double multiplexedSessionOperationsRatio;
4246

47+
// Count of checks performed to verify end to end traces using Cloud Trace APIs.
48+
private int cloudTraceCheckCount;
49+
50+
// Maximum checks allowed to verify end to end traces using Cloud Trace APIs.
51+
private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20;
52+
4353
public CloudExecutorImpl(
4454
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
4555
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
56+
this.cloudTraceCheckCount = 0;
4657
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
4758
}
4859

4960
/** Execute SpannerAsync action requests. */
5061
@Override
5162
public StreamObserver<SpannerAsyncActionRequest> executeActionAsync(
5263
StreamObserver<SpannerAsyncActionResponse> responseObserver) {
64+
// Create a top-level OpenTelemetry span for streaming request.
65+
Tracer tracer =
66+
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
67+
Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan();
68+
Scope scope = span.makeCurrent();
69+
70+
final String traceId = span.getSpanContext().getTraceId();
71+
final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled();
72+
AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false);
73+
5374
CloudClientExecutor.ExecutionFlowContext executionContext =
5475
clientExecutor.new ExecutionFlowContext(responseObserver);
5576
return new StreamObserver<SpannerAsyncActionRequest>() {
77+
5678
@Override
5779
public void onNext(SpannerAsyncActionRequest request) {
5880
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request));
@@ -86,6 +108,11 @@ public void onNext(SpannerAsyncActionRequest request) {
86108
Level.INFO,
87109
String.format("Updated request to set multiplexed session flag: \n%s", request));
88110
}
111+
String actionName = request.getAction().getActionCase().toString();
112+
if (actionName == "READ" || actionName == "QUERY") {
113+
requestHasReadOrQueryAction.set(true);
114+
}
115+
89116
Status status = clientExecutor.startHandlingRequest(request, executionContext);
90117
if (!status.isOk()) {
91118
LOGGER.log(
@@ -104,9 +131,26 @@ public void onError(Throwable t) {
104131

105132
@Override
106133
public void onCompleted() {
134+
if (isSampled
135+
&& cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT
136+
&& requestHasReadOrQueryAction.get()) {
137+
cloudTraceCheckCount++;
138+
if (!clientExecutor.verifyExportedEndToEndTrace(traceId)) {
139+
executionContext.onError(
140+
Status.INTERNAL
141+
.withDescription(
142+
String.format(
143+
"failed to verify end to end trace for trace_id: %s", traceId))
144+
.getCause());
145+
executionContext.cleanup();
146+
return;
147+
}
148+
}
107149
LOGGER.log(Level.INFO, "Client called Done, half closed");
108150
executionContext.cleanup();
109151
responseObserver.onCompleted();
152+
scope.close();
153+
span.end();
110154
}
111155
};
112156
}

0 commit comments

Comments
 (0)