Skip to content

Commit 7003903

Browse files
authored
feat: introduce HttpJsonClientCall, Listeners infrastructure and ServerStreaming support in REST transport (#1599)
This includes the following changes for `HTTP1.1/REST` transport: 1) `HttpJsonClientCall` class (with `HttpJsonClientCall.Listener`) mimicking [io.grpc.ClientCall](https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ClientCall.java#L102) functionality. Most of the complexity of this PR is concentrated in `HttpJsonClientCallImpl` class. 2) The unary callables are rewritten to be based on `HttpJsonClientCall` flow (similarly to how it is already done in gRPC unary calls). 3) Server streaming support for REST transport. The implementation is based on `HttpJsonClientCall` and `HttpJsonClientCall.Listener` (introduced in this PR), similarly to how gRPC streaming is based on `io.grpc.ClientCall` and `io.grpc.ClientCall.Listener` (implemented in [grpc-java](https://github.com/grpc/grpc-java/) library) respectively. The extreme similarity between `HttpJsonClientCall` call and `io.grpc.ClientCall` is intentional and crucial for consistency of the two transports and also intends simplifying creation and maintenance of multi-transport manual wrappers (like [google-ads-java](https://github.com/googleads/google-ads-java)). The server streaming abstractions in gax java are all based on the flow control managed by a ClientCall, so having similar set of abstractions in REST transport is necessary to reuse transport-independent portions of streaming logic in gax and maintain identical user-facing streaming surface. This PR also builds a foundation for the soon-coming [ClientInterceptor](https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ClientInterceptor.java#L42)-like infrastructure in REST transport. This is specifically required to support REST transport in [google-ads-java](https://github.com/googleads/google-ads-java/blob/main/google-ads/src/main/java/com/google/ads/googleads/lib/logging/LoggingInterceptor.java#L42). REST-based client-side streaming and bidirectional streaming is not implemented by this PR and most likely will never be due to limitations of the `HTTP1.1/REST` protocol compared to `HTTP2/gRPC`. Most of the java docs in `HttpJsonClientCall` class is a modified version of the java docs from `io.grpc.ClientCall`, which is intentional, because `HttpJsonClientCall` is designed to be as similar to `io.grpc.ClientCall` in both surface and behavior as possible (while the two classes cannot be a part of the same class hierarchy, because they belong to two independent transport layers). **What server-streaming means in case of REST transport** In REST transport server-streaming methods return a JSON array of response messages (i.e. the array element type is the same one used as a returned type in the corresponding method definition in protobuf). The response is provided as as [Chunck-encoded](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) input stream, containing one big JSON array. To parse the json array we rely on [JsonReader](https://github.com/google/gson/blob/master/gson/src/main/java/com/google/gson/stream/JsonReader.java#L191) from gson library, which gax-httpjson already depended on even prior this PR (check `ProtoMessageJsonStreamIterator` class implementation in this PR for details). Note, we must process elements of the array one-by-one because the size of the full array may be in realm of gigabytes. _**Note**, ideally I need to split this PR at least in two separate ones: 1) HttpJsonClientCall stuff and unary calls based on it in one PR and then 2) server streaming feature in a second PR. Unfortunately the most reasonable way to test `HttpJsonClientCall` infrastructure is by doing it from server streaming logic beause most of the complexity introduced in HttpJsonClient call is induced by necessity to support streaming workflow in the first place (and to support call interceptors (not part of this PR) as a secondary goal)._ _**Note**, there are a few minor breaking changes in gax-httpjson module (and only there) inroduced in this PR. This should be ok, because unlike gax and gax-grpc, gax-httpjson is not GA yet. The breaking changes are very minor (in the space of `HttpJsonCallContext` and `ManagedHttpJsonChannel`) and are backward-compatible with `java-compute` (the main and only officially supported user of gax-httpjson as of now)._
1 parent e85699a commit 7003903

36 files changed

Lines changed: 2889 additions & 622 deletions

gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testServerStreaming() throws Exception {
145145

146146
streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
147147

148-
latch.await(20, TimeUnit.SECONDS);
148+
Truth.assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
149149
Truth.assertThat(moneyObserver.error).isNull();
150150
Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
151151
}
@@ -157,13 +157,13 @@ public void testManualFlowControl() throws Exception {
157157

158158
streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
159159

160-
latch.await(500, TimeUnit.MILLISECONDS);
160+
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isFalse();
161161
Truth.assertWithMessage("Received response before requesting it")
162162
.that(moneyObserver.response)
163163
.isNull();
164164

165165
moneyObserver.controller.request(1);
166-
latch.await(500, TimeUnit.MILLISECONDS);
166+
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();
167167

168168
Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
169169
Truth.assertThat(moneyObserver.completed).isTrue();
@@ -178,7 +178,7 @@ public void testCancelClientCall() throws Exception {
178178

179179
moneyObserver.controller.cancel();
180180
moneyObserver.controller.request(1);
181-
latch.await(500, TimeUnit.MILLISECONDS);
181+
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();
182182

183183
Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class);
184184
Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream");
@@ -190,7 +190,7 @@ public void testOnResponseError() throws Throwable {
190190
MoneyObserver moneyObserver = new MoneyObserver(true, latch);
191191

192192
streamingCallable.call(ERROR_REQUEST, moneyObserver);
193-
latch.await(500, TimeUnit.MILLISECONDS);
193+
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();
194194

195195
Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class);
196196
Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode())

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMessageHttpResponseParser.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.google.protobuf.TypeRegistry;
4141
import java.io.InputStream;
4242
import java.io.InputStreamReader;
43+
import java.io.Reader;
4344
import java.lang.reflect.Type;
4445
import java.nio.charset.StandardCharsets;
4546

@@ -91,25 +92,28 @@ ApiMessageHttpResponseParser.Builder<ResponseT> newBuilder() {
9192

9293
@Override
9394
public ResponseT parse(InputStream httpResponseBody) {
95+
return parse(httpResponseBody, null);
96+
}
97+
98+
@Override
99+
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
100+
return parse(new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), registry);
101+
}
102+
103+
@Override
104+
public ResponseT parse(Reader httpResponseBody, TypeRegistry registry) {
94105
if (getResponseInstance() == null) {
95106
return null;
96107
} else {
97108
Type responseType = getResponseInstance().getClass();
98109
try {
99-
return getResponseMarshaller()
100-
.fromJson(
101-
new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), responseType);
110+
return getResponseMarshaller().fromJson(httpResponseBody, responseType);
102111
} catch (JsonIOException | JsonSyntaxException e) {
103112
throw new RestSerializationException(e);
104113
}
105114
}
106115
}
107116

108-
@Override
109-
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
110-
return parse(httpResponseBody);
111-
}
112-
113117
@Override
114118
public String serialize(ResponseT response) {
115119
return getResponseMarshaller().toJson(response);

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMethodDescriptor.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,18 @@
3737
@AutoValue
3838
/* Method descriptor for messages to be transmitted over HTTP. */
3939
public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
40+
public enum MethodType {
41+
UNARY,
42+
CLIENT_STREAMING,
43+
SERVER_STREAMING,
44+
BIDI_STREAMING,
45+
UNKNOWN;
46+
}
4047

4148
public abstract String getFullMethodName();
4249

4350
public abstract HttpRequestFormatter<RequestT> getRequestFormatter();
4451

45-
@Nullable
4652
public abstract HttpResponseParser<ResponseT> getResponseParser();
4753

4854
/** Return the HTTP method for this request message type. */
@@ -55,8 +61,11 @@ public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
5561
@Nullable
5662
public abstract PollingRequestFactory<RequestT> getPollingRequestFactory();
5763

64+
public abstract MethodType getType();
65+
5866
public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
59-
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>();
67+
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>()
68+
.setType(MethodType.UNARY);
6069
}
6170

6271
@AutoValue.Builder
@@ -78,6 +87,8 @@ public abstract Builder<RequestT, ResponseT> setOperationSnapshotFactory(
7887
public abstract Builder<RequestT, ResponseT> setPollingRequestFactory(
7988
PollingRequestFactory<RequestT> pollingRequestFactory);
8089

90+
public abstract Builder<RequestT, ResponseT> setType(MethodType type);
91+
8192
public abstract ApiMethodDescriptor<RequestT, ResponseT> build();
8293
}
8394
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.httpjson;
31+
32+
import com.google.api.client.http.HttpResponseException;
33+
import com.google.api.gax.rpc.ApiException;
34+
import com.google.api.gax.rpc.ApiExceptionFactory;
35+
import com.google.api.gax.rpc.StatusCode;
36+
import com.google.api.gax.rpc.StatusCode.Code;
37+
import com.google.common.collect.ImmutableSet;
38+
import java.util.Set;
39+
import java.util.concurrent.CancellationException;
40+
41+
class HttpJsonApiExceptionFactory {
42+
private final Set<Code> retryableCodes;
43+
44+
HttpJsonApiExceptionFactory(Set<Code> retryableCodes) {
45+
this.retryableCodes = ImmutableSet.copyOf(retryableCodes);
46+
}
47+
48+
ApiException create(Throwable throwable) {
49+
if (throwable instanceof HttpResponseException) {
50+
HttpResponseException e = (HttpResponseException) throwable;
51+
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
52+
boolean canRetry = retryableCodes.contains(statusCode.getCode());
53+
String message = e.getStatusMessage();
54+
return createApiException(throwable, statusCode, message, canRetry);
55+
} else if (throwable instanceof HttpJsonStatusRuntimeException) {
56+
HttpJsonStatusRuntimeException e = (HttpJsonStatusRuntimeException) throwable;
57+
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
58+
return createApiException(
59+
throwable, statusCode, e.getMessage(), retryableCodes.contains(statusCode.getCode()));
60+
} else if (throwable instanceof CancellationException) {
61+
return ApiExceptionFactory.createException(
62+
throwable, HttpJsonStatusCode.of(Code.CANCELLED), false);
63+
} else if (throwable instanceof ApiException) {
64+
return (ApiException) throwable;
65+
} else {
66+
// Do not retry on unknown throwable, even when UNKNOWN is in retryableCodes
67+
return ApiExceptionFactory.createException(
68+
throwable, HttpJsonStatusCode.of(StatusCode.Code.UNKNOWN), false);
69+
}
70+
}
71+
72+
private ApiException createApiException(
73+
Throwable throwable, StatusCode statusCode, String message, boolean canRetry) {
74+
return message == null
75+
? ApiExceptionFactory.createException(throwable, statusCode, canRetry)
76+
: ApiExceptionFactory.createException(message, throwable, statusCode, canRetry);
77+
}
78+
}

0 commit comments

Comments
 (0)