@@ -35,10 +35,15 @@ export const CLIENT_TERMINATED_ERROR_MSG =
3535 * @internal
3636 */
3737export class ClientPool < T > {
38+ private grpcEnabled = false ;
39+
3840 /**
3941 * Stores each active clients and how many operations it has outstanding.
4042 */
41- private activeClients = new Map < T , number > ( ) ;
43+ private activeClients = new Map <
44+ T ,
45+ { activeRequestCount : number ; grpcEnabled : boolean }
46+ > ( ) ;
4247
4348 /**
4449 * A set of clients that have seen RST_STREAM errors (see
@@ -72,7 +77,7 @@ export class ClientPool<T> {
7277 constructor (
7378 private readonly concurrentOperationLimit : number ,
7479 private readonly maxIdleClients : number ,
75- private readonly clientFactory : ( ) => T ,
80+ private readonly clientFactory : ( requiresGrpc : boolean ) => T ,
7681 private readonly clientDestructor : ( client : T ) => Promise < void > = ( ) =>
7782 Promise . resolve ( )
7883 ) { }
@@ -84,21 +89,22 @@ export class ClientPool<T> {
8489 * @private
8590 * @internal
8691 */
87- private acquire ( requestTag : string ) : T {
92+ private acquire ( requestTag : string , requiresGrpc : boolean ) : T {
8893 let selectedClient : T | null = null ;
8994 let selectedClientRequestCount = - 1 ;
9095
91- for ( const [ client , requestCount ] of this . activeClients ) {
96+ for ( const [ client , metadata ] of this . activeClients ) {
9297 // Use the "most-full" client that can still accommodate the request
9398 // in order to maximize the number of idle clients as operations start to
9499 // complete.
95100 if (
96101 ! this . failedClients . has ( client ) &&
97- requestCount > selectedClientRequestCount &&
98- requestCount < this . concurrentOperationLimit
102+ metadata . activeRequestCount > selectedClientRequestCount &&
103+ metadata . activeRequestCount < this . concurrentOperationLimit &&
104+ ( ! requiresGrpc || metadata . grpcEnabled )
99105 ) {
100106 selectedClient = client ;
101- selectedClientRequestCount = requestCount ;
107+ selectedClientRequestCount = metadata . activeRequestCount ;
102108 }
103109 }
104110
@@ -111,15 +117,18 @@ export class ClientPool<T> {
111117 ) ;
112118 } else {
113119 logger ( 'ClientPool.acquire' , requestTag , 'Creating a new client' ) ;
114- selectedClient = this . clientFactory ( ) ;
120+ selectedClient = this . clientFactory ( requiresGrpc ) ;
115121 selectedClientRequestCount = 0 ;
116122 assert (
117123 ! this . activeClients . has ( selectedClient ) ,
118124 'The provided client factory returned an existing instance'
119125 ) ;
120126 }
121127
122- this . activeClients . set ( selectedClient , selectedClientRequestCount + 1 ) ;
128+ this . activeClients . set ( selectedClient , {
129+ grpcEnabled : requiresGrpc ,
130+ activeRequestCount : selectedClientRequestCount + 1 ,
131+ } ) ;
123132
124133 return selectedClient ! ;
125134 }
@@ -131,9 +140,12 @@ export class ClientPool<T> {
131140 * @internal
132141 */
133142 private async release ( requestTag : string , client : T ) : Promise < void > {
134- const requestCount = this . activeClients . get ( client ) || 0 ;
135- assert ( requestCount > 0 , 'No active requests' ) ;
136- this . activeClients . set ( client , requestCount - 1 ) ;
143+ const metadata = this . activeClients . get ( client ) ;
144+ assert ( metadata && metadata . activeRequestCount > 0 , 'No active requests' ) ;
145+ this . activeClients . set ( client , {
146+ grpcEnabled : metadata . grpcEnabled ,
147+ activeRequestCount : metadata . activeRequestCount - 1 ,
148+ } ) ;
137149 if ( this . terminated && this . opCount === 0 ) {
138150 this . terminateDeferred . resolve ( ) ;
139151 }
@@ -153,22 +165,30 @@ export class ClientPool<T> {
153165 * @internal
154166 */
155167 private shouldGarbageCollectClient ( client : T ) : boolean {
156- // Don't garbage collect clients that have active requests.
157- if ( this . activeClients . get ( client ) !== 0 ) {
168+ const clientMetadata = this . activeClients . get ( client ) ! ;
169+
170+ if ( clientMetadata . activeRequestCount !== 0 ) {
171+ // Don't garbage collect clients that have active requests.
158172 return false ;
159173 }
160174
175+ if ( this . grpcEnabled !== clientMetadata . grpcEnabled ) {
176+ // We are transitioning to GRPC. Garbage collect REST clients.
177+ return true ;
178+ }
179+
161180 // Idle clients that have received RST_STREAM errors are always garbage
162181 // collected.
163182 if ( this . failedClients . has ( client ) ) {
164183 return true ;
165184 }
166185
167186 // Otherwise, only garbage collect if we have too much idle capacity (e.g.
168- // more than 100 idle capacity with default settings) .
187+ // more than 100 idle capacity with default settings).
169188 let idleCapacityCount = 0 ;
170- for ( const [ , count ] of this . activeClients ) {
171- idleCapacityCount += this . concurrentOperationLimit - count ;
189+ for ( const [ , metadata ] of this . activeClients ) {
190+ idleCapacityCount +=
191+ this . concurrentOperationLimit - metadata . activeRequestCount ;
172192 }
173193 return (
174194 idleCapacityCount > this . maxIdleClients * this . concurrentOperationLimit
@@ -197,7 +217,9 @@ export class ClientPool<T> {
197217 // Visible for testing.
198218 get opCount ( ) : number {
199219 let activeOperationCount = 0 ;
200- this . activeClients . forEach ( count => ( activeOperationCount += count ) ) ;
220+ this . activeClients . forEach (
221+ metadata => ( activeOperationCount += metadata . activeRequestCount )
222+ ) ;
201223 return activeOperationCount ;
202224 }
203225
@@ -213,11 +235,15 @@ export class ClientPool<T> {
213235 * @private
214236 * @internal
215237 */
216- run < V > ( requestTag : string , op : ( client : T ) => Promise < V > ) : Promise < V > {
238+ run < V > (
239+ requestTag : string ,
240+ requiresGrpc : boolean ,
241+ op : ( client : T ) => Promise < V >
242+ ) : Promise < V > {
217243 if ( this . terminated ) {
218244 return Promise . reject ( new Error ( CLIENT_TERMINATED_ERROR_MSG ) ) ;
219245 }
220- const client = this . acquire ( requestTag ) ;
246+ const client = this . acquire ( requestTag , requiresGrpc ) ;
221247
222248 return op ( client )
223249 . catch ( async ( err : GoogleError ) => {
0 commit comments