|
22 | 22 | import static org.asynchttpclient.Dsl.post; |
23 | 23 | import static org.asynchttpclient.Dsl.put; |
24 | 24 | import com.google.gson.Gson; |
25 | | -import io.netty.handler.codec.http.HttpHeaders; |
26 | 25 | import java.io.File; |
27 | 26 | import java.io.FileOutputStream; |
28 | 27 | import java.io.IOException; |
|
41 | 40 | import org.apache.commons.lang3.StringUtils; |
42 | 41 | import org.apache.pulsar.client.admin.Functions; |
43 | 42 | import org.apache.pulsar.client.admin.PulsarAdminException; |
| 43 | +import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; |
44 | 44 | import org.apache.pulsar.client.api.Authentication; |
45 | 45 | import org.apache.pulsar.common.functions.FunctionConfig; |
46 | 46 | import org.apache.pulsar.common.functions.FunctionDefinition; |
|
54 | 54 | import org.apache.pulsar.common.policies.data.FunctionStats; |
55 | 55 | import org.apache.pulsar.common.policies.data.FunctionStatsImpl; |
56 | 56 | import org.apache.pulsar.common.policies.data.FunctionStatus; |
57 | | -import org.asynchttpclient.AsyncHandler; |
58 | | -import org.asynchttpclient.AsyncHttpClient; |
| 57 | +import org.asynchttpclient.AsyncCompletionHandlerBase; |
59 | 58 | import org.asynchttpclient.HttpResponseBodyPart; |
60 | | -import org.asynchttpclient.HttpResponseStatus; |
61 | 59 | import org.asynchttpclient.RequestBuilder; |
62 | 60 | import org.asynchttpclient.request.body.multipart.ByteArrayPart; |
63 | 61 | import org.asynchttpclient.request.body.multipart.FilePart; |
|
70 | 68 | public class FunctionsImpl extends ComponentResource implements Functions { |
71 | 69 |
|
72 | 70 | private final WebTarget functions; |
73 | | - private final AsyncHttpClient asyncHttpClient; |
| 71 | + private final AsyncHttpRequestExecutor asyncHttpRequestExecutor; |
74 | 72 |
|
75 | | - public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { |
| 73 | + public FunctionsImpl(WebTarget web, Authentication auth, |
| 74 | + AsyncHttpRequestExecutor asyncHttpRequestExecutor, |
| 75 | + long requestTimeoutMs) { |
76 | 76 | super(auth, requestTimeoutMs); |
77 | 77 | this.functions = web.path("/admin/v3/functions"); |
78 | | - this.asyncHttpClient = asyncHttpClient; |
| 78 | + this.asyncHttpRequestExecutor = asyncHttpRequestExecutor; |
79 | 79 | } |
80 | 80 |
|
81 | 81 | @Override |
@@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig |
171 | 171 | // If the function code is built in, we don't need to submit here |
172 | 172 | builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); |
173 | 173 | } |
174 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) |
175 | | - .toCompletableFuture() |
| 174 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) |
176 | 175 | .thenAccept(response -> { |
177 | 176 | if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
178 | 177 | future.completeExceptionally( |
@@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync( |
263 | 262 | builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); |
264 | 263 | } |
265 | 264 |
|
266 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) |
267 | | - .toCompletableFuture() |
| 265 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) |
268 | 266 | .thenAccept(response -> { |
269 | 267 | if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
270 | 268 | future.completeExceptionally( |
@@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat |
464 | 462 | .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM)) |
465 | 463 | .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN)); |
466 | 464 |
|
467 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture() |
| 465 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) |
468 | 466 | .thenAccept(response -> { |
469 | 467 | if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
470 | 468 | future.completeExceptionally( |
@@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar |
543 | 541 |
|
544 | 542 | RequestBuilder builder = get(target.getUri().toASCIIString()); |
545 | 543 |
|
546 | | - CompletableFuture<HttpResponseStatus> statusFuture = |
547 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), |
548 | | - new AsyncHandler<HttpResponseStatus>() { |
549 | | - private HttpResponseStatus status; |
550 | | - |
551 | | - @Override |
552 | | - public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception { |
553 | | - status = responseStatus; |
554 | | - if (status.getStatusCode() != Response.Status.OK.getStatusCode()) { |
555 | | - return State.ABORT; |
556 | | - } |
557 | | - return State.CONTINUE; |
558 | | - } |
559 | | - |
560 | | - @Override |
561 | | - public State onHeadersReceived(HttpHeaders headers) throws Exception { |
562 | | - return State.CONTINUE; |
563 | | - } |
| 544 | + CompletableFuture<org.asynchttpclient.Response> responseFuture = |
| 545 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(), |
| 546 | + () -> new AsyncCompletionHandlerBase() { |
564 | 547 |
|
565 | 548 | @Override |
566 | 549 | public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { |
567 | 550 | os.write(bodyPart.getBodyByteBuffer()); |
568 | 551 | return State.CONTINUE; |
569 | 552 | } |
| 553 | + }); |
570 | 554 |
|
571 | | - @Override |
572 | | - public HttpResponseStatus onCompleted() throws Exception { |
573 | | - return status; |
574 | | - } |
575 | | - |
576 | | - @Override |
577 | | - public void onThrowable(Throwable t) { |
578 | | - } |
579 | | - }).toCompletableFuture(); |
580 | | - |
581 | | - statusFuture |
582 | | - .whenComplete((status, throwable) -> { |
| 555 | + responseFuture |
| 556 | + .whenComplete((response, throwable) -> { |
583 | 557 | try { |
584 | 558 | os.close(); |
585 | 559 | } catch (IOException e) { |
586 | 560 | future.completeExceptionally(getApiException(e)); |
587 | 561 | } |
588 | 562 | }) |
589 | | - .thenAccept(status -> { |
590 | | - if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { |
| 563 | + .thenAccept(response -> { |
| 564 | + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
591 | 565 | future.completeExceptionally( |
592 | 566 | getApiException(Response |
593 | | - .status(status.getStatusCode()) |
594 | | - .entity(status.getStatusText()) |
| 567 | + .status(response.getStatusCode()) |
| 568 | + .entity(response.getStatusText()) |
595 | 569 | .build())); |
596 | 570 | } else { |
597 | 571 | future.complete(null); |
@@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync( |
700 | 674 | .path("state").path(state.getKey()).getUri().toASCIIString()); |
701 | 675 | builder.addBodyPart(new StringPart("state", objectWriter() |
702 | 676 | .writeValueAsString(state), MediaType.APPLICATION_JSON)); |
703 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) |
| 677 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) |
704 | 678 | .toCompletableFuture() |
705 | 679 | .thenAccept(response -> { |
706 | 680 | if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
@@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n |
740 | 714 | .addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData)) |
741 | 715 | .addBodyPart(new StringPart("delete", Boolean.toString(delete))); |
742 | 716 |
|
743 | | - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) |
| 717 | + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) |
744 | 718 | .toCompletableFuture() |
745 | 719 | .thenAccept(response -> { |
746 | 720 | if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { |
|
0 commit comments