1313// limitations under the License.
1414package com .google .devtools .build .lib .remote ;
1515
16+ import com .google .devtools .build .lib .remote .grpc .ChannelConnectionFactory ;
17+ import com .google .devtools .build .lib .remote .grpc .ChannelConnectionFactory .ChannelConnection ;
18+ import com .google .devtools .build .lib .remote .grpc .DynamicConnectionPool ;
19+ import com .google .devtools .build .lib .remote .grpc .SharedConnectionFactory .SharedConnection ;
1620import io .grpc .CallOptions ;
21+ import io .grpc .Channel ;
1722import io .grpc .ClientCall ;
18- import io .grpc .ManagedChannel ;
23+ import io .grpc .ForwardingClientCall ;
24+ import io .grpc .ForwardingClientCallListener ;
25+ import io .grpc .Metadata ;
1926import io .grpc .MethodDescriptor ;
27+ import io .grpc .Status ;
2028import io .netty .util .AbstractReferenceCounted ;
2129import io .netty .util .ReferenceCounted ;
22- import java .util . concurrent . TimeUnit ;
30+ import java .io . IOException ;
2331
2432/**
25- * A wrapper around a {@link io.grpc.ManagedChannel } exposing a reference count. When instantiated
26- * the reference count is 1. {@link ManagedChannel#shutdown ()} will be called on the wrapped channel
27- * when the reference count reaches 0.
33+ * A wrapper around a {@link DynamicConnectionPool } exposing {@link Channel} and a reference count.
34+ * When instantiated the reference count is 1. {@link DynamicConnectionPool#close ()} will be called
35+ * on the wrapped channel when the reference count reaches 0.
2836 *
2937 * <p>See {@link ReferenceCounted} for more information about reference counting.
3038 */
31- public class ReferenceCountedChannel extends ManagedChannel implements ReferenceCounted {
32-
33- private final ManagedChannel channel ;
34- private final AbstractReferenceCounted referenceCounted ;
35-
36- public ReferenceCountedChannel (ManagedChannel channel ) {
37- this (
38- channel ,
39- new AbstractReferenceCounted () {
40- @ Override
41- protected void deallocate () {
42- channel .shutdown ();
39+ public class ReferenceCountedChannel extends Channel implements ReferenceCounted {
40+ private final DynamicConnectionPool dynamicConnectionPool ;
41+ private final AbstractReferenceCounted referenceCounted =
42+ new AbstractReferenceCounted () {
43+ @ Override
44+ protected void deallocate () {
45+ try {
46+ dynamicConnectionPool .close ();
47+ } catch (IOException e ) {
48+ throw new AssertionError (e .getMessage (), e );
4349 }
50+ }
4451
45- @ Override
46- public ReferenceCounted touch (Object o ) {
47- return this ;
48- }
49- });
50- }
51-
52- protected ReferenceCountedChannel (
53- ManagedChannel channel , AbstractReferenceCounted referenceCounted ) {
54- this .channel = channel ;
55- this .referenceCounted = referenceCounted ;
56- }
52+ @ Override
53+ public ReferenceCounted touch (Object o ) {
54+ return this ;
55+ }
56+ };
5757
58- @ Override
59- public ManagedChannel shutdown () {
60- throw new UnsupportedOperationException ("Don't call shutdown() directly, but use release() "
61- + "instead." );
58+ public ReferenceCountedChannel (ChannelConnectionFactory connectionFactory ) {
59+ this .dynamicConnectionPool =
60+ new DynamicConnectionPool (connectionFactory , connectionFactory .maxConcurrency ());
6261 }
6362
64- @ Override
6563 public boolean isShutdown () {
66- return channel .isShutdown ();
67- }
68-
69- @ Override
70- public boolean isTerminated () {
71- return channel .isTerminated ();
72- }
73-
74- @ Override
75- public ManagedChannel shutdownNow () {
76- throw new UnsupportedOperationException ("Don't call shutdownNow() directly, but use release() "
77- + "instead." );
78- }
79-
80- @ Override
81- public boolean awaitTermination (long timeout , TimeUnit timeUnit ) throws InterruptedException {
82- return channel .awaitTermination (timeout , timeUnit );
64+ return dynamicConnectionPool .isClosed ();
65+ }
66+
67+ /** A {@link ClientCall} which call {@link SharedConnection#close()} after the RPC is closed. */
68+ static class ConnectionCleanupCall <ReqT , RespT >
69+ extends ForwardingClientCall .SimpleForwardingClientCall <ReqT , RespT > {
70+ private final SharedConnection connection ;
71+
72+ protected ConnectionCleanupCall (ClientCall <ReqT , RespT > delegate , SharedConnection connection ) {
73+ super (delegate );
74+ this .connection = connection ;
75+ }
76+
77+ @ Override
78+ public void start (Listener <RespT > responseListener , Metadata headers ) {
79+ super .start (
80+ new ForwardingClientCallListener .SimpleForwardingClientCallListener <RespT >(
81+ responseListener ) {
82+ @ Override
83+ public void onClose (Status status , Metadata trailers ) {
84+ super .onClose (status , trailers );
85+
86+ try {
87+ connection .close ();
88+ } catch (IOException e ) {
89+ throw new AssertionError (e .getMessage (), e );
90+ }
91+ }
92+ },
93+ headers );
94+ }
8395 }
8496
8597 @ Override
8698 public <RequestT , ResponseT > ClientCall <RequestT , ResponseT > newCall (
8799 MethodDescriptor <RequestT , ResponseT > methodDescriptor , CallOptions callOptions ) {
88- return channel .<RequestT , ResponseT >newCall (methodDescriptor , callOptions );
100+ SharedConnection sharedConnection = dynamicConnectionPool .create ().blockingGet ();
101+ ChannelConnection connection = (ChannelConnection ) sharedConnection .getUnderlyingConnection ();
102+ return new ConnectionCleanupCall <>(
103+ connection .getChannel ().newCall (methodDescriptor , callOptions ), sharedConnection );
89104 }
90105
91106 @ Override
92107 public String authority () {
93- return channel .authority ();
108+ SharedConnection sharedConnection = dynamicConnectionPool .create ().blockingGet ();
109+ ChannelConnection connection = (ChannelConnection ) sharedConnection .getUnderlyingConnection ();
110+ return connection .getChannel ().authority ();
94111 }
95112
96113 @ Override
@@ -131,4 +148,4 @@ public boolean release() {
131148 public boolean release (int decrement ) {
132149 return referenceCounted .release (decrement );
133150 }
134- }
151+ }
0 commit comments