This repository was archived by the owner on Sep 26, 2023. It is now read-only.
File tree Expand file tree Collapse file tree
main/java/com/google/api/gax/rpc
test/java/com/google/api/gax/rpc Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -183,7 +183,7 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
183183 private boolean autoAutoFlowControl = true ;
184184
185185 private final ResponseObserver <ResponseT > outerResponseObserver ;
186- private StreamController innerController ;
186+ private volatile StreamController innerController ;
187187
188188 @ GuardedBy ("lock" )
189189 private State state = State .IDLE ;
@@ -296,6 +296,12 @@ public void onCompleteImpl() {
296296 * @return True if the stream was canceled.
297297 */
298298 boolean cancelIfStale () {
299+ // If the stream hasn't started yet, innerController will be null. Skip the check this time
300+ // and return false so the stream is still watched.
301+ if (innerController == null ) {
302+ return false ;
303+ }
304+
299305 Throwable myError = null ;
300306
301307 synchronized (lock ) {
Original file line number Diff line number Diff line change @@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException {
128128 assertThat (actualError ).isInstanceOf (WatchdogTimeoutException .class );
129129 }
130130
131+ @ Test
132+ public void testTimedOutBeforeStart () throws InterruptedException {
133+ MockServerStreamingCallable <String , String > callable1 = new MockServerStreamingCallable <>();
134+ AccumulatingObserver <String > downstreamObserver1 = new AccumulatingObserver <>();
135+ ResponseObserver observer = watchdog .watch (downstreamObserver1 , waitTime , idleTime );
136+ clock .incrementNanoTime (idleTime .toNanos () + 1 );
137+ // This should not remove callable1 from watched list
138+ watchdog .run ();
139+ assertThat (downstreamObserver1 .done .isDone ()).isFalse ();
140+
141+ callable1 .call ("request" , observer );
142+ // This should cancel callable1
143+ watchdog .run ();
144+ MockServerStreamingCall <String , String > call1 = callable1 .popLastCall ();
145+ assertThat (call1 .getController ().isCancelled ()).isTrue ();
146+ call1 .getController ().getObserver ().onError (new CancellationException ("User cancelled" ));
147+ Throwable error = null ;
148+ try {
149+ downstreamObserver1 .done .get ();
150+ } catch (ExecutionException t ) {
151+ error = t .getCause ();
152+ }
153+ assertThat (error ).isInstanceOf (WatchdogTimeoutException .class );
154+ }
155+
131156 @ Test
132157 public void testMultiple () throws Exception {
133158 // Start stream1
You can’t perform that action at this time.
0 commit comments