Skip to content

Commit 3e7dbb4

Browse files
authored
[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder (#22541)
1 parent 3560ddb commit 3e7dbb4

File tree

22 files changed

+675
-220
lines changed

22 files changed

+675
-220
lines changed

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ The Apache Software License, Version 2.0
536536
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java17-1.33.3-alpha.jar
537537
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.3-alpha.jar
538538
- io.opentelemetry.semconv-opentelemetry-semconv-1.25.0-alpha.jar
539+
* Spotify completable-futures
540+
- com.spotify-completable-futures-0.3.6.jar
539541

540542
BSD 3-clause "New" or "Revised" License
541543
* Google auth library
@@ -580,15 +582,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
580582
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
581583
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
582584
* Jersey
583-
- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
584-
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
585-
- org.glassfish.jersey.core-jersey-client-2.41.jar
586-
- org.glassfish.jersey.core-jersey-common-2.41.jar
587-
- org.glassfish.jersey.core-jersey-server-2.41.jar
588-
- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
589-
- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
590-
- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
591-
- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
585+
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
586+
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
587+
- org.glassfish.jersey.core-jersey-client-2.42.jar
588+
- org.glassfish.jersey.core-jersey-common-2.42.jar
589+
- org.glassfish.jersey.core-jersey-server-2.42.jar
590+
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
591+
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
592+
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
593+
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
592594
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar
593595

594596
Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt

distribution/shell/src/assemble/LICENSE.bin.txt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ The Apache Software License, Version 2.0
417417
- avro-1.11.3.jar
418418
- avro-protobuf-1.11.3.jar
419419
* RE2j -- re2j-1.7.jar
420+
* Spotify completable-futures -- completable-futures-0.3.6.jar
420421

421422
BSD 3-clause "New" or "Revised" License
422423
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
@@ -446,12 +447,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
446447
- aopalliance-repackaged-2.6.1.jar
447448
- osgi-resource-locator-1.0.3.jar
448449
* Jersey
449-
- jersey-client-2.41.jar
450-
- jersey-common-2.41.jar
451-
- jersey-entity-filtering-2.41.jar
452-
- jersey-media-json-jackson-2.41.jar
453-
- jersey-media-multipart-2.41.jar
454-
- jersey-hk2-2.41.jar
450+
- jersey-client-2.42.jar
451+
- jersey-common-2.42.jar
452+
- jersey-entity-filtering-2.42.jar
453+
- jersey-media-json-jackson-2.42.jar
454+
- jersey-media-multipart-2.42.jar
455+
- jersey-hk2-2.42.jar
455456
* Mimepull -- mimepull-1.9.15.jar
456457

457458
Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
152152
<netty-iouring.version>0.0.24.Final</netty-iouring.version>
153153
<jetty.version>9.4.54.v20240208</jetty.version>
154154
<conscrypt.version>2.5.2</conscrypt.version>
155-
<jersey.version>2.41</jersey.version>
155+
<jersey.version>2.42</jersey.version>
156156
<athenz.version>1.10.50</athenz.version>
157157
<prometheus.version>0.16.0</prometheus.version>
158158
<vertx.version>4.5.8</vertx.version>
@@ -266,6 +266,7 @@ flexible messaging model and an intuitive client API.</description>
266266
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
267267
<picocli.version>4.7.5</picocli.version>
268268
<re2j.version>1.7</re2j.version>
269+
<completable-futures.version>0.3.6</completable-futures.version>
269270
<failsafe.version>3.3.2</failsafe.version>
270271

271272
<!-- test dependencies -->
@@ -665,6 +666,12 @@ flexible messaging model and an intuitive client API.</description>
665666
<version>${re2j.version}</version>
666667
</dependency>
667668

669+
<dependency>
670+
<groupId>com.spotify</groupId>
671+
<artifactId>completable-futures</artifactId>
672+
<version>${completable-futures.version}</version>
673+
</dependency>
674+
668675
<dependency>
669676
<groupId>org.rocksdb</groupId>
670677
<artifactId>rocksdbjni</artifactId>

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
336336
* requests
337337
*/
338338
PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression);
339+
340+
/**
341+
* Configures the maximum number of connections that the client library will establish with a single host.
342+
* <p>
343+
* By default, the connection pool maintains up to 16 connections to a single host. This method allows you to
344+
* modify this default behavior and limit the number of connections.
345+
* <p>
346+
* This setting can be useful in scenarios where you want to limit the resources used by the client library,
347+
* or control the level of parallelism for operations so that a single client does not overwhelm
348+
* the Pulsar cluster with too many concurrent connections.
349+
*
350+
* @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable
351+
* the limit.
352+
* @return the PulsarAdminBuilder instance, allowing for method chaining
353+
*/
354+
PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);
355+
356+
/**
357+
* Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified
358+
* amount of seconds, it will be released back to the connection pool.
359+
* Defaults to 25 seconds.
360+
*
361+
* @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection
362+
* @return the PulsarAdminBuilder instance
363+
*/
364+
PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
339365
}

pulsar-client-admin-shaded/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
<include>com.google.guava:guava</include>
124124
<include>com.google.code.gson:gson</include>
125125
<include>com.google.re2j:re2j</include>
126+
<include>com.spotify:completable-futures</include>
126127
<include>com.fasterxml.jackson.*:*</include>
127128
<include>io.netty:*</include>
128129
<include>io.netty.incubator:*</include>
@@ -192,6 +193,10 @@
192193
<exclude>com.google.protobuf.*</exclude>
193194
</excludes>
194195
</relocation>
196+
<relocation>
197+
<pattern>com.spotify.futures</pattern>
198+
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
199+
</relocation>
195200
<relocation>
196201
<pattern>com.fasterxml.jackson</pattern>
197202
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.asynchttpclient.Dsl.post;
2323
import static org.asynchttpclient.Dsl.put;
2424
import com.google.gson.Gson;
25-
import io.netty.handler.codec.http.HttpHeaders;
2625
import java.io.File;
2726
import java.io.FileOutputStream;
2827
import java.io.IOException;
@@ -41,6 +40,7 @@
4140
import org.apache.commons.lang3.StringUtils;
4241
import org.apache.pulsar.client.admin.Functions;
4342
import org.apache.pulsar.client.admin.PulsarAdminException;
43+
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
4444
import org.apache.pulsar.client.api.Authentication;
4545
import org.apache.pulsar.common.functions.FunctionConfig;
4646
import org.apache.pulsar.common.functions.FunctionDefinition;
@@ -54,10 +54,8 @@
5454
import org.apache.pulsar.common.policies.data.FunctionStats;
5555
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
5656
import org.apache.pulsar.common.policies.data.FunctionStatus;
57-
import org.asynchttpclient.AsyncHandler;
58-
import org.asynchttpclient.AsyncHttpClient;
57+
import org.asynchttpclient.AsyncCompletionHandlerBase;
5958
import org.asynchttpclient.HttpResponseBodyPart;
60-
import org.asynchttpclient.HttpResponseStatus;
6159
import org.asynchttpclient.RequestBuilder;
6260
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
6361
import org.asynchttpclient.request.body.multipart.FilePart;
@@ -70,12 +68,14 @@
7068
public class FunctionsImpl extends ComponentResource implements Functions {
7169

7270
private final WebTarget functions;
73-
private final AsyncHttpClient asyncHttpClient;
71+
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
7472

75-
public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
73+
public FunctionsImpl(WebTarget web, Authentication auth,
74+
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
75+
long requestTimeoutMs) {
7676
super(auth, requestTimeoutMs);
7777
this.functions = web.path("/admin/v3/functions");
78-
this.asyncHttpClient = asyncHttpClient;
78+
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
7979
}
8080

8181
@Override
@@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig
171171
// If the function code is built in, we don't need to submit here
172172
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
173173
}
174-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
175-
.toCompletableFuture()
174+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
176175
.thenAccept(response -> {
177176
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
178177
future.completeExceptionally(
@@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync(
263262
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
264263
}
265264

266-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
267-
.toCompletableFuture()
265+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
268266
.thenAccept(response -> {
269267
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
270268
future.completeExceptionally(
@@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat
464462
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
465463
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
466464

467-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
465+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
468466
.thenAccept(response -> {
469467
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
470468
future.completeExceptionally(
@@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar
543541

544542
RequestBuilder builder = get(target.getUri().toASCIIString());
545543

546-
CompletableFuture<HttpResponseStatus> statusFuture =
547-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(),
548-
new AsyncHandler<HttpResponseStatus>() {
549-
private HttpResponseStatus status;
550-
551-
@Override
552-
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
553-
status = responseStatus;
554-
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
555-
return State.ABORT;
556-
}
557-
return State.CONTINUE;
558-
}
559-
560-
@Override
561-
public State onHeadersReceived(HttpHeaders headers) throws Exception {
562-
return State.CONTINUE;
563-
}
544+
CompletableFuture<org.asynchttpclient.Response> responseFuture =
545+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(),
546+
() -> new AsyncCompletionHandlerBase() {
564547

565548
@Override
566549
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
567550
os.write(bodyPart.getBodyByteBuffer());
568551
return State.CONTINUE;
569552
}
553+
});
570554

571-
@Override
572-
public HttpResponseStatus onCompleted() throws Exception {
573-
return status;
574-
}
575-
576-
@Override
577-
public void onThrowable(Throwable t) {
578-
}
579-
}).toCompletableFuture();
580-
581-
statusFuture
582-
.whenComplete((status, throwable) -> {
555+
responseFuture
556+
.whenComplete((response, throwable) -> {
583557
try {
584558
os.close();
585559
} catch (IOException e) {
586560
future.completeExceptionally(getApiException(e));
587561
}
588562
})
589-
.thenAccept(status -> {
590-
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
563+
.thenAccept(response -> {
564+
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
591565
future.completeExceptionally(
592566
getApiException(Response
593-
.status(status.getStatusCode())
594-
.entity(status.getStatusText())
567+
.status(response.getStatusCode())
568+
.entity(response.getStatusText())
595569
.build()));
596570
} else {
597571
future.complete(null);
@@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync(
700674
.path("state").path(state.getKey()).getUri().toASCIIString());
701675
builder.addBodyPart(new StringPart("state", objectWriter()
702676
.writeValueAsString(state), MediaType.APPLICATION_JSON));
703-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
677+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
704678
.toCompletableFuture()
705679
.thenAccept(response -> {
706680
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
740714
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
741715
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));
742716

743-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
717+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
744718
.toCompletableFuture()
745719
.thenAccept(response -> {
746720
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {

0 commit comments

Comments
 (0)