@@ -69,23 +69,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
6969 @ GuardedBy ("lock" )
7070 private Collection <PendingStream > pendingStreams = new LinkedHashSet <>();
7171
72- /**
73- * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74- * terminated.
75- */
76- @ GuardedBy ("lock" )
77- private Status shutdownStatus ;
78-
79- /**
80- * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81- * to idle.
82- */
83- @ GuardedBy ("lock" )
84- @ Nullable
85- private SubchannelPicker lastPicker ;
86-
87- @ GuardedBy ("lock" )
88- private long lastPickerVersion ;
72+ /** Immutable state needed for picking. 'lock' must be held for writing. */
73+ private volatile PickerState pickerState = new PickerState (null , null );
8974
9075 /**
9176 * Creates a new delayed transport.
@@ -139,33 +124,30 @@ public final ClientStream newStream(
139124 try {
140125 PickSubchannelArgs args = new PickSubchannelArgsImpl (
141126 method , headers , callOptions , new PickDetailsConsumerImpl (tracers ));
142- SubchannelPicker picker = null ;
143- long pickerVersion = -1 ;
127+ PickerState state = pickerState ;
144128 while (true ) {
145- synchronized (lock ) {
146- if (shutdownStatus != null ) {
147- return new FailingClientStream (shutdownStatus , tracers );
148- }
149- if (lastPicker == null ) {
150- return createPendingStream (args , tracers );
151- }
152- // Check for second time through the loop, and whether anything changed
153- if (picker != null && pickerVersion == lastPickerVersion ) {
154- return createPendingStream (args , tracers );
155- }
156- picker = lastPicker ;
157- pickerVersion = lastPickerVersion ;
129+ if (state .shutdownStatus != null ) {
130+ return new FailingClientStream (state .shutdownStatus , tracers );
158131 }
159- PickResult pickResult = picker .pickSubchannel (args );
160- ClientTransport transport = GrpcUtil .getTransportFromPickResult (pickResult ,
161- callOptions .isWaitForReady ());
162- if (transport != null ) {
163- return transport .newStream (
164- args .getMethodDescriptor (), args .getHeaders (), args .getCallOptions (),
165- tracers );
132+ if (state .lastPicker != null ) {
133+ PickResult pickResult = state .lastPicker .pickSubchannel (args );
134+ ClientTransport transport = GrpcUtil .getTransportFromPickResult (pickResult ,
135+ callOptions .isWaitForReady ());
136+ if (transport != null ) {
137+ return transport .newStream (
138+ args .getMethodDescriptor (), args .getHeaders (), args .getCallOptions (),
139+ tracers );
140+ }
166141 }
167142 // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
168143 // race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
144+ synchronized (lock ) {
145+ PickerState newerState = pickerState ;
146+ if (state == newerState ) {
147+ return createPendingStream (args , tracers );
148+ }
149+ state = newerState ;
150+ }
169151 }
170152 } finally {
171153 syncContext .drain ();
@@ -210,10 +192,10 @@ public ListenableFuture<SocketStats> getStats() {
210192 @ Override
211193 public final void shutdown (final Status status ) {
212194 synchronized (lock ) {
213- if (shutdownStatus != null ) {
195+ if (pickerState . shutdownStatus != null ) {
214196 return ;
215197 }
216- shutdownStatus = status ;
198+ pickerState = pickerState . withShutdownStatus ( status ) ;
217199 syncContext .executeLater (new Runnable () {
218200 @ Override
219201 public void run () {
@@ -288,8 +270,7 @@ final int getPendingStreamsCount() {
288270 final void reprocess (@ Nullable SubchannelPicker picker ) {
289271 ArrayList <PendingStream > toProcess ;
290272 synchronized (lock ) {
291- lastPicker = picker ;
292- lastPickerVersion ++;
273+ pickerState = pickerState .withPicker (picker );
293274 if (picker == null || !hasPendingStreams ()) {
294275 return ;
295276 }
@@ -338,7 +319,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
338319 // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
339320 // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
340321 syncContext .executeLater (reportTransportNotInUse );
341- if (shutdownStatus != null && reportTransportTerminated != null ) {
322+ if (pickerState . shutdownStatus != null && reportTransportTerminated != null ) {
342323 syncContext .executeLater (reportTransportTerminated );
343324 reportTransportTerminated = null ;
344325 }
@@ -384,7 +365,7 @@ public void cancel(Status reason) {
384365 boolean justRemovedAnElement = pendingStreams .remove (this );
385366 if (!hasPendingStreams () && justRemovedAnElement ) {
386367 syncContext .executeLater (reportTransportNotInUse );
387- if (shutdownStatus != null ) {
368+ if (pickerState . shutdownStatus != null ) {
388369 syncContext .executeLater (reportTransportTerminated );
389370 reportTransportTerminated = null ;
390371 }
@@ -409,4 +390,32 @@ public void appendTimeoutInsight(InsightBuilder insight) {
409390 super .appendTimeoutInsight (insight );
410391 }
411392 }
393+
394+ static final class PickerState {
395+ /**
396+ * The last picker that {@link #reprocess} has used. May be set to null when the channel has
397+ * moved to idle.
398+ */
399+ @ Nullable
400+ final SubchannelPicker lastPicker ;
401+ /**
402+ * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
403+ * terminated.
404+ */
405+ @ Nullable
406+ final Status shutdownStatus ;
407+
408+ private PickerState (SubchannelPicker lastPicker , Status shutdownStatus ) {
409+ this .lastPicker = lastPicker ;
410+ this .shutdownStatus = shutdownStatus ;
411+ }
412+
413+ public PickerState withPicker (SubchannelPicker newPicker ) {
414+ return new PickerState (newPicker , this .shutdownStatus );
415+ }
416+
417+ public PickerState withShutdownStatus (Status newShutdownStatus ) {
418+ return new PickerState (this .lastPicker , newShutdownStatus );
419+ }
420+ }
412421}
0 commit comments