Skip to content

Commit 162c4c8

Browse files
Ramesh Mallarameshmalla
authored andcommitted
Failsafe executors refactoring
1 parent de42fac commit 162c4c8

File tree

8 files changed

+204
-186
lines changed

8 files changed

+204
-186
lines changed

riptide-micrometer/src/main/java/org/zalando/riptide/micrometer/ThreadPoolMetrics.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.micrometer.core.instrument.MeterRegistry;
66
import io.micrometer.core.instrument.Tag;
77
import io.micrometer.core.instrument.binder.MeterBinder;
8-
import lombok.AllArgsConstructor;
98
import org.apiguardian.api.API;
109

1110
import java.util.concurrent.ThreadPoolExecutor;
@@ -14,11 +13,9 @@
1413
import static com.google.common.collect.ImmutableList.copyOf;
1514
import static io.micrometer.core.instrument.binder.BaseUnits.TASKS;
1615
import static io.micrometer.core.instrument.binder.BaseUnits.THREADS;
17-
import static lombok.AccessLevel.PRIVATE;
1816
import static org.apiguardian.api.API.Status.EXPERIMENTAL;
1917

2018
@API(status = EXPERIMENTAL)
21-
@AllArgsConstructor(access = PRIVATE)
2219
public final class ThreadPoolMetrics implements MeterBinder {
2320

2421
private final ThreadPoolExecutor executor;
@@ -29,6 +26,12 @@ public ThreadPoolMetrics(final ThreadPoolExecutor executor) {
2926
this(executor, "http.client.threads", ImmutableList.of());
3027
}
3128

29+
private ThreadPoolMetrics(ThreadPoolExecutor executor, String metricName, ImmutableList<Tag> defaultTags) {
30+
this.executor = executor;
31+
this.metricName = metricName;
32+
this.defaultTags = defaultTags;
33+
}
34+
3235
public ThreadPoolMetrics withMetricName(final String metricName) {
3336
return new ThreadPoolMetrics(executor, metricName, defaultTags);
3437
}

riptide-spring-boot-autoconfigure/README.md

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,23 @@ riptide.clients:
2727
enabled: true
2828
fixed-delay: 50 milliseconds
2929
max-retries: 5
30+
threads:
31+
max-size: 10
32+
min-size: 2
33+
enabled: true
34+
keep-alive: 5 minutes
35+
queue-size: 10
3036
circuit-breaker:
3137
enabled: true
3238
failure-threshold: 3 out of 5
3339
delay: 30 seconds
3440
success-threshold: 5 out of 5
41+
threads:
42+
max-size: 10
43+
min-size: 2
44+
enabled: true
45+
keep-alive: 5 minutes
46+
queue-size: 10
3547
caching:
3648
enabled: true
3749
shared: false
@@ -274,18 +286,42 @@ riptide:
274286
max-retries: 5
275287
max-duration: 2 seconds
276288
jitter: 25 milliseconds
289+
threads:
290+
max-size: 10
291+
min-size: 2
292+
enabled: true
293+
keep-alive: 5 minutes
294+
queue-size: 10
277295
circuit-breaker:
278296
enabled: true
279297
failure-threshold: 3 out of 5
280298
failure-rate-threshold: 3 out of 5 in 5 seconds
281299
delay: 30 seconds
282300
success-threshold: 5 out of 5
301+
threads:
302+
max-size: 10
303+
min-size: 2
304+
enabled: true
305+
keep-alive: 5 minutes
306+
queue-size: 10
283307
backup-request:
284308
enabled: true
285309
delay: 75 milliseconds
310+
threads:
311+
max-size: 10
312+
min-size: 2
313+
enabled: true
314+
keep-alive: 5 minutes
315+
queue-size: 10
286316
timeouts:
287317
enabled: true
288318
global: 500 milliseconds
319+
threads:
320+
max-size: 10
321+
min-size: 2
322+
enabled: true
323+
keep-alive: 5 minutes
324+
queue-size: 10
289325
caching:
290326
enabled: true
291327
shared: true
@@ -624,8 +660,51 @@ The following table shows all beans with their respective name (for the `example
624660
If you override a bean then all of its dependencies (see the [graph](#customization)), will **not** be registered,
625661
unless required by some other bean.
626662

627-
You can specify `ExecutorService` for each `FailsafePlugin` by providing beans with the following naming convention:
628-
`exampleRetryPolicyExecutorService`, `exampleCircuitBreakerExecutorService`, `exampleBackupRequestExecutorService`, `exampleTimeoutExecutorService`.
663+
Riptide uses Failsafe underneath to manage resiliency flows, and Failsafe supports custom thread pool executors. For more details, refer to the [riptide-failsafe](https://github.com/zalando/riptide/tree/main/riptide-failsafe#custom-executor) documentation. To configure a custom thread pool executor for retry, circuit breaker, backup requests, and timeout features, follow the configuration steps below.
664+
```yaml
665+
retry:
666+
enabled: true
667+
fixed-delay: 50 milliseconds
668+
max-retries: 5
669+
max-duration: 2 seconds
670+
jitter: 25 milliseconds
671+
threads:
672+
max-size: 10
673+
min-size: 2
674+
enabled: true
675+
keep-alive: 5 minutes
676+
queue-size: 10
677+
circuit-breaker:
678+
enabled: true
679+
failure-threshold: 3 out of 5
680+
failure-rate-threshold: 3 out of 5 in 5 seconds
681+
delay: 30 seconds
682+
success-threshold: 5 out of 5
683+
threads:
684+
max-size: 10
685+
min-size: 2
686+
enabled: true
687+
keep-alive: 5 minutes
688+
queue-size: 10
689+
backup-request:
690+
enabled: true
691+
delay: 75 milliseconds
692+
threads:
693+
max-size: 10
694+
min-size: 2
695+
enabled: true
696+
keep-alive: 5 minutes
697+
queue-size: 10
698+
timeouts:
699+
enabled: true
700+
global: 500 milliseconds
701+
threads:
702+
max-size: 10
703+
min-size: 2
704+
enabled: true
705+
keep-alive: 5 minutes
706+
queue-size: 10
707+
```
629708

630709
In case you need more than one custom plugin, please use `Plugin.composite(Plugin...)`.
631710

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import org.zalando.riptide.soap.SOAPHttpMessageConverter;
5656
import org.zalando.riptide.stream.Streams;
5757

58+
import javax.annotation.Nonnull;
59+
import javax.annotation.Nullable;
5860
import java.net.SocketTimeoutException;
5961
import java.net.URI;
6062
import java.time.Clock;
@@ -105,7 +107,7 @@ private String registerHttp(final String id, final Client client) {
105107

106108
return genericBeanDefinition(HttpFactory.class)
107109
.setFactoryMethod("create")
108-
.addConstructorArgValue(createExecutor(id, client))
110+
.addConstructorArgValue(createExecutor(id, "http.client.threads", client, client.getThreads()))
109111
.addConstructorArgReference(registerClientHttpRequestFactory(id, client))
110112
.addConstructorArgReference(registerBaseURL(id, client))
111113
.addConstructorArgValue(client.getUrlResolution())
@@ -138,27 +140,29 @@ private String registerBaseURL(final String id, final Client client) {
138140
});
139141
}
140142

141-
private Object createExecutor(final String id, final Client client) {
142-
if (client.getThreads().getEnabled()) {
143-
return ref(registerExecutor(id, client));
143+
private Object createExecutor(final String id, final String metricName, final Client client, @Nullable final RiptideProperties.Threads threads) {
144+
if (threads != null && threads.getEnabled()) {
145+
return ref(registerExecutor(id, metricName, threads, client));
144146
}
145-
146147
return null;
147148
}
148149

149-
private String registerExecutor(final String id, final Client client) {
150+
private String registerExecutor(final String id,
151+
final String metricName,
152+
@Nonnull final RiptideProperties.Threads threads,
153+
final Client client) {
150154
final String executorId = registry.registerIfAbsent(id, ExecutorService.class, () ->
151155
genericBeanDefinition(ThreadPoolFactory.class)
152156
.addConstructorArgValue(id)
153-
.addConstructorArgValue(client.getThreads())
157+
.addConstructorArgValue(threads)
154158
.setFactoryMethod("create")
155159
.setDestroyMethodName("shutdown"));
156160

157161
if (client.getMetrics().getEnabled()) {
158162
registry.registerIfAbsent(id, ThreadPoolMetrics.class, () ->
159163
genericBeanDefinition(ThreadPoolMetrics.class)
160164
.addConstructorArgReference(executorId)
161-
.addConstructorArgValue("http.client.threads")
165+
.addConstructorArgValue(metricName)
162166
.addConstructorArgValue(ImmutableList.of(clientId(id))));
163167
}
164168

@@ -172,6 +176,7 @@ private String registerExecutor(final String id, final Client client) {
172176
return executorId;
173177
}
174178

179+
175180
private static final class HttpMessageConverters {
176181

177182

@@ -408,15 +413,12 @@ private Optional<String> registerCircuitBreakerFailsafePlugin(final String id, f
408413
if (client.getCircuitBreaker().getEnabled()) {
409414
final String pluginId = registry.registerIfAbsent(name(id, CircuitBreaker.class, FailsafePlugin.class),
410415
() -> {
411-
var executorService = registry.find(name(id, "CircuitBreaker", ExecutorService.class));
412-
var executorServiceRef = executorService.map(Registry::ref).orElse(null);
413-
414416
log.debug("Client [{}]: Registering [CircuitBreakerFailsafePlugin]", id);
415417
return genericBeanDefinition(FailsafePluginFactory.class)
416418
.setFactoryMethod("createCircuitBreakerPlugin")
417419
.addConstructorArgValue(registerCircuitBreaker(id, client))
418420
.addConstructorArgValue(createTaskDecorators(id, client))
419-
.addConstructorArgValue(executorServiceRef);
421+
.addConstructorArgValue(createExecutor(id + "-circuit-breaker", "failsafe.circuitbreaker.executor", client, client.getCircuitBreaker().getThreads()));
420422
});
421423
return Optional.of(pluginId);
422424
}
@@ -425,16 +427,14 @@ private Optional<String> registerCircuitBreakerFailsafePlugin(final String id, f
425427

426428
private Optional<String> registerRetryPolicyFailsafePlugin(final String id, final Client client) {
427429
if (client.getRetry().getEnabled()) {
428-
final String pluginId = registry.registerIfAbsent(name(id, "RetryPolicy", FailsafePlugin.class), () -> {
429-
var executorService = registry.find(name(id, "RetryPolicy", ExecutorService.class));
430-
var executorServiceRef = executorService.map(Registry::ref).orElse(null);
431430

431+
final String pluginId = registry.registerIfAbsent(name(id, "RetryPolicy", FailsafePlugin.class), () -> {
432432
log.debug("Client [{}]: Registering [RetryPolicyFailsafePlugin]", id);
433433
return genericBeanDefinition(FailsafePluginFactory.class)
434434
.setFactoryMethod("createRetryFailsafePlugin")
435435
.addConstructorArgValue(client)
436436
.addConstructorArgValue(createTaskDecorators(id, client))
437-
.addConstructorArgValue(executorServiceRef);
437+
.addConstructorArgValue(createExecutor(id + "-retry-policy", "failsafe.retry.executor", client, client.getRetry().getThreads()));
438438
});
439439
return Optional.of(pluginId);
440440
}
@@ -457,15 +457,12 @@ private Optional<String> registerBackupRequestFailsafePlugin(final String id, fi
457457
if (client.getBackupRequest().getEnabled()) {
458458
final String pluginId = registry.registerIfAbsent(name(id, BackupRequest.class, FailsafePlugin.class),
459459
() -> {
460-
var executorService = registry.find(name(id, "BackupRequest", ExecutorService.class));
461-
var executorServiceRef = executorService.map(Registry::ref).orElse(null);
462-
463460
log.debug("Client [{}]: Registering [BackupRequestFailsafePlugin]", id);
464461
return genericBeanDefinition(FailsafePluginFactory.class)
465462
.setFactoryMethod("createBackupRequestPlugin")
466463
.addConstructorArgValue(client)
467464
.addConstructorArgValue(createTaskDecorators(id, client))
468-
.addConstructorArgValue(executorServiceRef);
465+
.addConstructorArgValue(createExecutor(id + "-backup-request", "failsafe.backuprequest.executor", client, client.getBackupRequest().getThreads()));
469466
});
470467
return Optional.of(pluginId);
471468
}
@@ -475,15 +472,12 @@ private Optional<String> registerBackupRequestFailsafePlugin(final String id, fi
475472
private Optional<String> registerTimeoutFailsafePlugin(final String id, final Client client) {
476473
if (client.getTimeouts().getEnabled()) {
477474
final String pluginId = registry.registerIfAbsent(name(id, Timeout.class, FailsafePlugin.class), () -> {
478-
var executorService = registry.find(name(id, "Timeout", ExecutorService.class));
479-
var executorServiceRef = executorService.map(Registry::ref).orElse(null);
480-
481475
log.debug("Client [{}]: Registering [TimeoutFailsafePlugin]", id);
482476
return genericBeanDefinition(FailsafePluginFactory.class)
483477
.setFactoryMethod("createTimeoutPlugin")
484478
.addConstructorArgValue(client)
485479
.addConstructorArgValue(createTaskDecorators(id, client))
486-
.addConstructorArgValue(executorServiceRef);
480+
.addConstructorArgValue(createExecutor(id + "-timeout","failsafe.timeout.executor", client, client.getTimeouts().getThreads()));
487481
});
488482
return Optional.of(pluginId);
489483
}

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Defaulting.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ private static Retry merge(final Retry base, final Retry defaults) {
204204
either(base.getMaxRetries(), defaults.getMaxRetries()),
205205
either(base.getMaxDuration(), defaults.getMaxDuration()),
206206
either(base.getJitterFactor(), defaults.getJitterFactor()),
207-
either(base.getJitter(), defaults.getJitter())
207+
either(base.getJitter(), defaults.getJitter()),
208+
either(base.getThreads(), defaults.getThreads())
208209
);
209210
}
210211

@@ -223,21 +224,24 @@ private static CircuitBreaker merge(final CircuitBreaker base, final CircuitBrea
223224
either(base.getFailureThreshold(), defaults.getFailureThreshold()),
224225
either(base.getFailureRateThreshold(), defaults.getFailureRateThreshold()),
225226
either(base.getDelay(), defaults.getDelay()),
226-
either(base.getSuccessThreshold(), defaults.getSuccessThreshold())
227+
either(base.getSuccessThreshold(), defaults.getSuccessThreshold()),
228+
either(base.getThreads(), defaults.getThreads())
227229
);
228230
}
229231

230232
private static BackupRequest merge(final BackupRequest base, final BackupRequest defaults) {
231233
return new BackupRequest(
232234
either(base.getEnabled(), defaults.getEnabled()),
233-
either(base.getDelay(), defaults.getDelay())
235+
either(base.getDelay(), defaults.getDelay()),
236+
either(base.getThreads(), defaults.getThreads())
234237
);
235238
}
236239

237240
private static Timeouts merge(final Timeouts base, final Timeouts defaults) {
238241
return new Timeouts(
239242
either(base.getEnabled(), defaults.getEnabled()),
240-
either(base.getGlobal(), defaults.getGlobal())
243+
either(base.getGlobal(), defaults.getGlobal()),
244+
either(base.getThreads(), defaults.getThreads())
241245
);
242246
}
243247

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideProperties.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ public static final class Defaults {
9797

9898
@NestedConfigurationProperty
9999
private Retry retry = new Retry(false, null,
100-
new Backoff(false, null, null, null), -1, TimeSpan.of(5, SECONDS), null, null);
100+
new Backoff(false, null, null, null), -1, TimeSpan.of(5, SECONDS), null, null,null);
101101

102102
@NestedConfigurationProperty
103-
private CircuitBreaker circuitBreaker = new CircuitBreaker(false, null, null, TimeSpan.of(0, SECONDS), null);
103+
private CircuitBreaker circuitBreaker = new CircuitBreaker(false, null, null, TimeSpan.of(0, SECONDS), null, null);
104104

105105
@NestedConfigurationProperty
106-
private BackupRequest backupRequest = new BackupRequest(false, null);
106+
private BackupRequest backupRequest = new BackupRequest(false, null, null);
107107

108108
@NestedConfigurationProperty
109-
private Timeouts timeouts = new Timeouts(false, null);
109+
private Timeouts timeouts = new Timeouts(false, null, null);
110110

111111
@NestedConfigurationProperty
112112
private RequestCompression requestCompression = new RequestCompression(false);
@@ -306,6 +306,7 @@ public static final class Retry {
306306
private TimeSpan maxDuration;
307307
private Double jitterFactor;
308308
private TimeSpan jitter;
309+
private Threads threads;
309310

310311
@Getter
311312
@Setter
@@ -329,6 +330,7 @@ public static final class CircuitBreaker {
329330
private RatioInTimeSpan failureRateThreshold;
330331
private TimeSpan delay;
331332
private Ratio successThreshold;
333+
private Threads threads;
332334
}
333335

334336
@Getter
@@ -338,6 +340,7 @@ public static final class CircuitBreaker {
338340
public static final class BackupRequest {
339341
private Boolean enabled;
340342
private TimeSpan delay;
343+
private Threads threads;
341344
}
342345

343346
@Getter
@@ -347,6 +350,7 @@ public static final class BackupRequest {
347350
public static final class Timeouts {
348351
private Boolean enabled;
349352
private TimeSpan global;
353+
private Threads threads;
350354
}
351355

352356
@Getter

0 commit comments

Comments
 (0)