|
56 | 56 | import com.google.api.pathtemplate.PathTemplate; |
57 | 57 | import com.google.cloud.RetryHelper; |
58 | 58 | import com.google.cloud.RetryHelper.RetryHelperException; |
| 59 | +import com.google.cloud.grpc.GcpManagedChannel; |
59 | 60 | import com.google.cloud.grpc.GcpManagedChannelBuilder; |
60 | 61 | import com.google.cloud.grpc.GcpManagedChannelOptions; |
61 | 62 | import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; |
@@ -267,6 +268,8 @@ public class GapicSpannerRpc implements SpannerRpc { |
267 | 268 | private static final ConcurrentMap<String, RateLimiter> ADMINISTRATIVE_REQUESTS_RATE_LIMITERS = |
268 | 269 | new ConcurrentHashMap<>(); |
269 | 270 | private final boolean leaderAwareRoutingEnabled; |
| 271 | + private final int numChannels; |
| 272 | + private final boolean isGrpcGcpExtensionEnabled; |
270 | 273 |
|
271 | 274 | public static GapicSpannerRpc create(SpannerOptions options) { |
272 | 275 | return new GapicSpannerRpc(options); |
@@ -318,6 +321,8 @@ public GapicSpannerRpc(final SpannerOptions options) { |
318 | 321 | this.callCredentialsProvider = options.getCallCredentialsProvider(); |
319 | 322 | this.compressorName = options.getCompressorName(); |
320 | 323 | this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); |
| 324 | + this.numChannels = options.getNumChannels(); |
| 325 | + this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled(); |
321 | 326 |
|
322 | 327 | if (initializeStubs) { |
323 | 328 | // First check if SpannerOptions provides a TransportChannelProvider. Create one |
@@ -1960,7 +1965,20 @@ <ReqT, RespT> GrpcCallContext newCallContext( |
1960 | 1965 | boolean routeToLeader) { |
1961 | 1966 | GrpcCallContext context = GrpcCallContext.createDefault(); |
1962 | 1967 | if (options != null) { |
1963 | | - context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); |
| 1968 | + if (this.isGrpcGcpExtensionEnabled) { |
| 1969 | + // Set channel affinity in gRPC-GCP. |
| 1970 | + // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. |
| 1971 | + int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; |
| 1972 | + context = |
| 1973 | + context.withCallOptions( |
| 1974 | + context |
| 1975 | + .getCallOptions() |
| 1976 | + .withOption( |
| 1977 | + GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); |
| 1978 | + } else { |
| 1979 | + // Set channel affinity in GAX. |
| 1980 | + context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); |
| 1981 | + } |
1964 | 1982 | } |
1965 | 1983 | if (compressorName != null) { |
1966 | 1984 | // This sets the compressor for Client -> Server. |
|
0 commit comments