Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 44a018b

Browse files
authored
Merge branch 'master' into release-please/branches/master
2 parents 1c28f1b + 69458b4 commit 44a018b

5 files changed

Lines changed: 65 additions & 12 deletions

File tree

gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,14 @@ public DynamicFlowControlSettings build() {
115115
}
116116

117117
private void verifyElementCountSettings(DynamicFlowControlSettings settings) {
118-
boolean isEnabled =
119-
settings.getInitialOutstandingElementCount() != null
120-
|| settings.getMinOutstandingElementCount() != null
121-
|| settings.getMaxOutstandingElementCount() != null;
122-
if (!isEnabled) {
118+
// If LimitExceededBehavior is Ignore, dynamic flow control is disabled, there's no need to
119+
// check element count limit settings
120+
if (settings.getLimitExceededBehavior() == LimitExceededBehavior.Ignore) {
121+
return;
122+
}
123+
if (settings.getInitialOutstandingElementCount() == null
124+
&& settings.getMinOutstandingElementCount() == null
125+
&& settings.getMaxOutstandingElementCount() == null) {
123126
return;
124127
}
125128
Preconditions.checkState(
@@ -141,11 +144,14 @@ private void verifyElementCountSettings(DynamicFlowControlSettings settings) {
141144
}
142145

143146
private void verifyRequestBytesSettings(DynamicFlowControlSettings settings) {
144-
boolean isEnabled =
145-
settings.getInitialOutstandingRequestBytes() != null
146-
|| settings.getMinOutstandingRequestBytes() != null
147-
|| settings.getMaxOutstandingRequestBytes() != null;
148-
if (!isEnabled) {
147+
// If LimitExceededBehavior is Ignore, dynamic flow control is disabled, there's no need to
148+
// check request bytes limit settings
149+
if (settings.getLimitExceededBehavior() == LimitExceededBehavior.Ignore) {
150+
return;
151+
}
152+
if (settings.getInitialOutstandingRequestBytes() == null
153+
&& settings.getMinOutstandingRequestBytes() == null
154+
&& settings.getMaxOutstandingRequestBytes() == null) {
149155
return;
150156
}
151157
Preconditions.checkState(

gax/src/main/java/com/google/api/gax/rpc/Watchdog.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
183183
private boolean autoAutoFlowControl = true;
184184

185185
private final ResponseObserver<ResponseT> outerResponseObserver;
186-
private StreamController innerController;
186+
private volatile StreamController innerController;
187187

188188
@GuardedBy("lock")
189189
private State state = State.IDLE;
@@ -296,6 +296,12 @@ public void onCompleteImpl() {
296296
* @return True if the stream was canceled.
297297
*/
298298
boolean cancelIfStale() {
299+
// If the stream hasn't started yet, innerController will be null. Skip the check this time
300+
// and return false so the stream is still watched.
301+
if (innerController == null) {
302+
return false;
303+
}
304+
299305
Throwable myError = null;
300306

301307
synchronized (lock) {

gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ public void testEmptyBuilder() {
5555
assertEquals(LimitExceededBehavior.Block, settings.getLimitExceededBehavior());
5656
}
5757

58+
@Test
59+
public void testPartialSettingsIgnored() {
60+
// If behavior is ignore, build shouldn't throw exceptions even when only one of the bytes or
61+
// element limits is set
62+
DynamicFlowControlSettings.Builder builder =
63+
DynamicFlowControlSettings.newBuilder()
64+
.setLimitExceededBehavior(LimitExceededBehavior.Ignore)
65+
.setMaxOutstandingElementCount(1L);
66+
builder.build();
67+
builder =
68+
DynamicFlowControlSettings.newBuilder()
69+
.setLimitExceededBehavior(LimitExceededBehavior.Ignore)
70+
.setMinOutstandingRequestBytes(1L);
71+
builder.build();
72+
}
73+
5874
@Test
5975
public void testBuilder() {
6076
DynamicFlowControlSettings.Builder builder =

gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException {
128128
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
129129
}
130130

131+
@Test
132+
public void testTimedOutBeforeStart() throws InterruptedException {
133+
MockServerStreamingCallable<String, String> callable1 = new MockServerStreamingCallable<>();
134+
AccumulatingObserver<String> downstreamObserver1 = new AccumulatingObserver<>();
135+
ResponseObserver observer = watchdog.watch(downstreamObserver1, waitTime, idleTime);
136+
clock.incrementNanoTime(idleTime.toNanos() + 1);
137+
// This should not remove callable1 from watched list
138+
watchdog.run();
139+
assertThat(downstreamObserver1.done.isDone()).isFalse();
140+
141+
callable1.call("request", observer);
142+
// This should cancel callable1
143+
watchdog.run();
144+
MockServerStreamingCall<String, String> call1 = callable1.popLastCall();
145+
assertThat(call1.getController().isCancelled()).isTrue();
146+
call1.getController().getObserver().onError(new CancellationException("User cancelled"));
147+
Throwable error = null;
148+
try {
149+
downstreamObserver1.done.get();
150+
} catch (ExecutionException t) {
151+
error = t.getCause();
152+
}
153+
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
154+
}
155+
131156
@Test
132157
public void testMultiple() throws Exception {
133158
// Start stream1

samples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<dependency>
2626
<groupId>com.google.auto.value</groupId>
2727
<artifactId>auto-value</artifactId>
28-
<version>1.7.5</version>
28+
<version>1.8</version>
2929
</dependency>
3030
<dependency>
3131
<groupId>junit</groupId>

0 commit comments

Comments
 (0)