Skip to content

Commit cd537e9

Browse files
dinoolivasduskis
authored andcommitted
---
yaml --- r: 26365 b: refs/heads/autosynth-dataproc c: e3e812b h: refs/heads/master i: 26363: fad4f64
1 parent 19e7ef0 commit cd537e9

5 files changed

Lines changed: 344 additions & 1 deletion

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ refs/tags/v0.78.0: 62d4bd30605ab3578f9a08d84487fb0b33ac2ff5
168168
refs/tags/v0.79.0: 82287b570708748c411d05c40f3932cff9606feb
169169
refs/tags/v0.80.0: f745e744d38e4fe636f34d0e04795ba3d014287d
170170
refs/tags/v0.81.0: ed3a0c85339ea6b73560b9a570abfbb76b93a263
171-
refs/heads/autosynth-dataproc: f1d2976e9fb26c440b94b9dfa012e52493132f53
171+
refs/heads/autosynth-dataproc: e3e812b42322a1587cc5a648035dd5dbd39743b9
172172
refs/heads/autosynth-securitycenter: 80d8b3f724d20028fde7e06d1c16d6bfd36837f9
173173
refs/heads/autosynth-talent: 383a363aeb0af16c9997201d1f963360641f68a8
174174
refs/heads/cscc-samples: 620d105e6b574cfeeee04e413a157b7bd34ebc8b

branches/autosynth-dataproc/google-cloud-clients/google-cloud-pubsub/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@
7878
<artifactId>grpc-google-iam-v1</artifactId>
7979
<scope>test</scope>
8080
</dependency>
81+
<dependency>
82+
<groupId>io.opencensus</groupId>
83+
<artifactId>opencensus-impl</artifactId>
84+
<version>${opencensus.version}</version>
85+
<scope>test</scope>
86+
</dependency>
8187
<!-- Need testing utility classes for generated gRPC clients tests -->
8288
<dependency>
8389
<groupId>com.google.api</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/* Copyright 2019 Google Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package com.google.cloud.pubsub.v1;
17+
18+
import com.google.api.core.ApiFunction;
19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.errorprone.annotations.MustBeClosed;
21+
import com.google.pubsub.v1.PubsubMessage;
22+
import io.opencensus.common.Scope;
23+
import io.opencensus.tags.TagContext;
24+
import io.opencensus.tags.Tagger;
25+
import io.opencensus.tags.Tags;
26+
import io.opencensus.tags.propagation.TagContextBinarySerializer;
27+
import io.opencensus.trace.Link;
28+
import io.opencensus.trace.SpanContext;
29+
import io.opencensus.trace.Tracer;
30+
import io.opencensus.trace.Tracing;
31+
import io.opencensus.trace.propagation.SpanContextParseException;
32+
import io.opencensus.trace.propagation.TextFormat;
33+
import io.opencensus.trace.propagation.TextFormat.Getter;
34+
import io.opencensus.trace.propagation.TextFormat.Setter;
35+
import io.opencensus.trace.samplers.Samplers;
36+
import java.util.logging.Level;
37+
import java.util.logging.Logger;
38+
39+
/**
40+
* Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers
41+
* to subscribers.
42+
*/
43+
public class OpenCensusUtil {
44+
private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());
45+
46+
public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
47+
public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
48+
@VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
49+
private static final String TRACEPARENT_KEY = "traceparent";
50+
51+
private static final Tagger tagger = Tags.getTagger();
52+
private static final TagContextBinarySerializer serializer =
53+
Tags.getTagPropagationComponent().getBinarySerializer();
54+
55+
private static final Tracer tracer = Tracing.getTracer();
56+
private static final TextFormat traceContextTextFormat =
57+
Tracing.getPropagationComponent().getTraceContextFormat();
58+
59+
/**
60+
* Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as
61+
* attributes to the {@link PubsubMessage}.
62+
*/
63+
public static final ApiFunction<PubsubMessage, PubsubMessage> OPEN_CENSUS_MESSAGE_TRANSFORM =
64+
new ApiFunction<PubsubMessage, PubsubMessage>() {
65+
@Override
66+
public PubsubMessage apply(PubsubMessage message) {
67+
PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
68+
String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
69+
String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
70+
if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
71+
return message;
72+
}
73+
if (!encodedSpanContext.isEmpty()) {
74+
builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext);
75+
}
76+
if (!encodedTagContext.isEmpty()) {
77+
builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext);
78+
}
79+
return builder.build();
80+
}
81+
};
82+
83+
private static final Setter<StringBuilder> setter =
84+
new Setter<StringBuilder>() {
85+
@Override
86+
public void put(StringBuilder carrier, String key, String value) {
87+
if (key.equals(TRACEPARENT_KEY)) {
88+
carrier.append(value);
89+
}
90+
}
91+
};
92+
93+
private static final Getter<String> getter =
94+
new Getter<String>() {
95+
@Override
96+
public String get(String carrier, String key) {
97+
return key.equals(TRACEPARENT_KEY) ? carrier : null;
98+
}
99+
};
100+
101+
@VisibleForTesting
102+
static String encodeSpanContext(SpanContext ctxt) {
103+
StringBuilder builder = new StringBuilder();
104+
traceContextTextFormat.inject(ctxt, builder, setter);
105+
return builder.toString();
106+
}
107+
108+
// TODO: update this code once the text encoding of tags has been resolved
109+
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
110+
private static String encodeTagContext(TagContext tags) {
111+
return "";
112+
}
113+
114+
// TODO: update this code once the text encoding of tags has been resolved
115+
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
116+
private static Scope createScopedTagContext(String encodedTags) {
117+
return tagger.withTagContext(tagger.getCurrentTagContext());
118+
}
119+
120+
@VisibleForTesting
121+
@MustBeClosed
122+
static Scope createScopedSpan(String name) {
123+
return tracer
124+
.spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
125+
.setRecordEvents(true)
126+
// Note: we preserve the sampling decision from the publisher.
127+
.setSampler(Samplers.alwaysSample())
128+
.startScopedSpan();
129+
}
130+
131+
private static void addParentLink(String encodedParentSpanContext) {
132+
try {
133+
SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
134+
tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
135+
} catch (SpanContextParseException exn) {
136+
logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
137+
}
138+
}
139+
140+
/**
141+
* Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts and
142+
* puts them in scope.
143+
*/
144+
public static class OpenCensusMessageReceiver implements MessageReceiver {
145+
private final MessageReceiver receiver;
146+
147+
public OpenCensusMessageReceiver(MessageReceiver receiver) {
148+
this.receiver = receiver;
149+
}
150+
151+
@Override
152+
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
153+
String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, "");
154+
if (encodedTagContext.isEmpty()) {
155+
addTraceScope(message, consumer);
156+
return;
157+
}
158+
try (Scope statsScope = createScopedTagContext(encodedTagContext)) {
159+
addTraceScope(message, consumer);
160+
}
161+
}
162+
163+
private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) {
164+
String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, "");
165+
if (encodedSpanContext.isEmpty()) {
166+
receiver.receiveMessage(message, consumer);
167+
return;
168+
}
169+
try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) {
170+
addParentLink(encodedSpanContext);
171+
receiver.receiveMessage(message, consumer);
172+
}
173+
}
174+
}
175+
}

branches/autosynth-dataproc/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsub.v1;
1818

19+
import com.google.api.core.ApiFunction;
1920
import com.google.api.core.ApiFuture;
2021
import com.google.api.core.ApiFutureCallback;
2122
import com.google.api.core.ApiFutures;
@@ -95,6 +96,7 @@ public class Publisher {
9596
private final List<AutoCloseable> closeables;
9697
private final MessageWaiter messagesWaiter;
9798
private ScheduledFuture<?> currentAlarmFuture;
99+
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;
98100

99101
/** The maximum number of messages in one request. Defined by the API. */
100102
public static long getApiMaxRequestElementCount() {
@@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException {
110112
topicName = builder.topicName;
111113

112114
this.batchingSettings = builder.batchingSettings;
115+
this.messageTransform = builder.messageTransform;
113116

114117
messagesBatch = new LinkedList<>();
115118
messagesBatchLock = new ReentrantLock();
@@ -192,6 +195,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
192195
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
193196
}
194197

198+
message = messageTransform.apply(message);
195199
final int messageSize = message.getSerializedSize();
196200
OutstandingBatch batchToSend = null;
197201
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
@@ -528,6 +532,14 @@ public static final class Builder {
528532
CredentialsProvider credentialsProvider =
529533
TopicAdminSettings.defaultCredentialsProviderBuilder().build();
530534

535+
ApiFunction<PubsubMessage, PubsubMessage> messageTransform =
536+
new ApiFunction<PubsubMessage, PubsubMessage>() {
537+
@Override
538+
public PubsubMessage apply(PubsubMessage input) {
539+
return input;
540+
}
541+
};
542+
531543
private Builder(String topic) {
532544
this.topicName = Preconditions.checkNotNull(topic);
533545
}
@@ -610,6 +622,17 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
610622
return this;
611623
}
612624

625+
/**
626+
* Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage}
627+
* before it is sent
628+
*/
629+
@BetaApi
630+
public Builder setTransform(ApiFunction<PubsubMessage, PubsubMessage> messageTransform) {
631+
this.messageTransform =
632+
Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null.");
633+
return this;
634+
}
635+
613636
public Publisher build() throws IOException {
614637
return new Publisher(this);
615638
}

0 commit comments

Comments
 (0)