|
36 | 36 | import com.google.api.gax.rpc.StreamController; |
37 | 37 | import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor; |
38 | 38 | import com.google.cloud.pubsub.v1.stub.SubscriberStub; |
| 39 | +import com.google.common.collect.ImmutableList; |
| 40 | +import com.google.common.collect.ImmutableMap; |
39 | 41 | import com.google.common.collect.Lists; |
40 | 42 | import com.google.common.util.concurrent.MoreExecutors; |
41 | 43 | import com.google.protobuf.Any; |
@@ -86,6 +88,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements |
86 | 88 |
|
87 | 89 | private Duration inititalStreamAckDeadline; |
88 | 90 |
|
| 91 | + private final Map<String, List<String>> streamMetadata; |
| 92 | + |
89 | 93 | private final SubscriberStub subscriberStub; |
90 | 94 | private final int channelAffinity; |
91 | 95 | private final String subscription; |
@@ -134,6 +138,9 @@ private StreamingSubscriberConnection(Builder builder) { |
134 | 138 | inititalStreamAckDeadline = builder.maxDurationPerAckExtension; |
135 | 139 | } |
136 | 140 |
|
| 141 | + streamMetadata = |
| 142 | + ImmutableMap.of("x-goog-request-params", ImmutableList.of("subscription=" + subscription)); |
| 143 | + |
137 | 144 | subscriberStub = builder.subscriberStub; |
138 | 145 | channelAffinity = builder.channelAffinity; |
139 | 146 |
|
@@ -273,7 +280,9 @@ private void initialize() { |
273 | 280 | .streamingPullCallable() |
274 | 281 | .splitCall( |
275 | 282 | responseObserver, |
276 | | - GrpcCallContext.createDefault().withChannelAffinity(channelAffinity)); |
| 283 | + GrpcCallContext.createDefault() |
| 284 | + .withChannelAffinity(channelAffinity) |
| 285 | + .withExtraHeaders(streamMetadata)); |
277 | 286 |
|
278 | 287 | logger.log(Level.FINER, "Initializing stream to subscription {0}", subscription); |
279 | 288 | // We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt |
|
0 commit comments