Skip to content

Commit 7744cd4

Browse files
authored
fix : [ClusterPipeline] ExecutorService/thread is created and destroyed too frequently in ClusterPipeline (#4479)
* fix : [ClusterPipeline] ExecutorService/thread is created and destroyed too frequently in ClusterPipeline * acquireExecutorService based on sharedExecutor flag * clean up * address co-pilot review comments
1 parent a3cfede commit 7744cd4

5 files changed

Lines changed: 287 additions & 8 deletions

File tree

src/main/java/redis/clients/jedis/ClusterPipeline.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.time.Duration;
44
import java.util.Set;
5+
import java.util.concurrent.ExecutorService;
6+
57
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
68
import redis.clients.jedis.exceptions.JedisClusterOperationException;
79
import redis.clients.jedis.providers.ClusterConnectionProvider;
@@ -90,8 +92,8 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
9092
}
9193

9294
ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
93-
CommandFlagsRegistry commandFlagsRegistry) {
94-
super(commandObjects, commandFlagsRegistry);
95+
CommandFlagsRegistry commandFlagsRegistry, ExecutorService executorService) {
96+
super(commandObjects, commandFlagsRegistry, executorService);
9597
this.provider = provider;
9698
}
9799

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Collections;
55
import java.util.Map;
66
import java.util.Set;
7+
import java.util.concurrent.ExecutorService;
78

89
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
910

@@ -420,10 +421,49 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha
420421
}
421422
// commands
422423

424+
/**
425+
* Creates a new pipeline for executing commands in a Redis Cluster.
426+
*
427+
* <p>Pipelining allows batching multiple commands for more efficient execution
428+
* by reducing network round-trips. In a cluster environment, commands are routed
429+
* to the appropriate nodes based on key hash slots.</p>
430+
*
431+
* <p>If the pipeline spans multiple nodes, a dedicated {@link ExecutorService} is
432+
* created internally to execute requests in parallel and shutdown when the pipeline
433+
* is synced.</p>
434+
*
435+
* @return a new {@link ClusterPipeline} instance
436+
* @see #pipelined(ExecutorService)
437+
*/
423438
@Override
424439
public ClusterPipeline pipelined() {
425-
return new ClusterPipeline((ClusterConnectionProvider) provider,
426-
(ClusterCommandObjects) commandObjects, commandFlagsRegistry);
440+
return pipelined(null);
441+
}
442+
443+
/**
444+
* Creates a new pipeline for executing commands in a Redis Cluster using the provided executor.
445+
*
446+
* <p>Pipelining allows batching multiple commands for more efficient execution
447+
* by reducing network round-trips. In a cluster environment, commands are routed
448+
* to the appropriate nodes based on key hash slots.</p>
449+
*
450+
* <p>If the pipeline spans multiple nodes, the provided {@link ExecutorService} is
451+
* used to execute requests in parallel. The caller is responsible for managing
452+
* the lifecycle of this executor (creation, shutdown, etc.).</p>
453+
*
454+
* <p>If {@code null} is provided, a dedicated executor is created and managed
455+
* internally, similar to {@link #pipelined()}.</p>
456+
*
457+
* @param executorService the executor to use for multi-node execution, or {@code null}
458+
* @return a new {@link ClusterPipeline} instance
459+
*/
460+
public ClusterPipeline pipelined(ExecutorService executorService) {
461+
return new ClusterPipeline(
462+
(ClusterConnectionProvider) provider,
463+
(ClusterCommandObjects) commandObjects,
464+
commandFlagsRegistry,
465+
executorService
466+
);
427467
}
428468

429469
/**

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,29 @@ public abstract class MultiNodePipelineBase extends AbstractPipeline {
3434
private volatile boolean syncing = false;
3535
protected final CommandFlagsRegistry commandFlagsRegistry;
3636

37+
/**
38+
* External executor service to use for {@code sync()}. If not set, a new executor service will be
39+
* created for each {@code sync()} call.
40+
*/
41+
private final ExecutorService sharedExecutorService;
42+
3743
public MultiNodePipelineBase(CommandObjects commandObjects) {
3844
this(commandObjects, StaticCommandFlagsRegistry.registry());
3945
}
4046

4147
protected MultiNodePipelineBase(CommandObjects commandObjects, CommandFlagsRegistry commandFlagsRegistry) {
48+
this(commandObjects, commandFlagsRegistry, null);
49+
}
50+
51+
MultiNodePipelineBase(CommandObjects commandObjects, CommandFlagsRegistry commandFlagsRegistry, ExecutorService executorService) {
4252
super(commandObjects);
4353
this.commandFlagsRegistry = commandFlagsRegistry;
4454
pipelinedResponses = new LinkedHashMap<>();
4555
connections = new LinkedHashMap<>();
56+
this.sharedExecutorService = executorService;
4657
}
4758

59+
4860
protected abstract HostAndPort getNodeKey(CommandArguments args);
4961

5062
protected abstract Connection getConnection(HostAndPort nodeKey);
@@ -100,7 +112,7 @@ public final void sync() {
100112
Executor executor;
101113
ExecutorService executorService = null;
102114
if (multiNode) {
103-
executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
115+
executorService = getPipelineExecutor();
104116
executor = executorService;
105117
} else {
106118
executor = Runnable::run;
@@ -144,12 +156,53 @@ public final void sync() {
144156
log.error("Thread is interrupted during sync.", e);
145157
}
146158

147-
executorService.shutdownNow();
159+
releasePipelineExecutor(executorService);
148160
}
149161

150162
syncing = false;
151163
}
152164

165+
/**
166+
* Acquires the executor service to run multi-node pipeline commands.
167+
* <p>
168+
* If a shared executor is provided by the user, it is returned.
169+
* Otherwise, a new dedicated executor is created for this pipeline.
170+
* </p>
171+
*/
172+
private ExecutorService getPipelineExecutor() {
173+
return isUsingSharedExecutor()
174+
? this.sharedExecutorService
175+
: createDedicatedPipelineExecutor();
176+
}
177+
178+
/**
179+
* Releases the executor service used by the pipeline.
180+
* <p>
181+
* Dedicated executors are shut down after use.
182+
* Shared executors are managed externally and not shut down.
183+
* </p>
184+
*/
185+
private void releasePipelineExecutor(ExecutorService executorService) {
186+
if (!isUsingSharedExecutor()) {
187+
executorService.shutdownNow();
188+
}
189+
}
190+
191+
/**
192+
* Returns true if this pipeline is using a shared executor service
193+
* provided externally.
194+
*/
195+
private boolean isUsingSharedExecutor() {
196+
return this.sharedExecutorService != null;
197+
}
198+
199+
/**
200+
* Creates a new dedicated executor for multi-node pipeline execution.
201+
*/
202+
private ExecutorService createDedicatedPipelineExecutor() {
203+
return Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
204+
}
205+
153206
/**
154207
* Validates that a command can be executed in a multi-node pipeline.
155208
* <p>

src/main/java/redis/clients/jedis/RedisClusterClient.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.List;
66
import java.util.Map;
77
import java.util.Set;
8+
import java.util.concurrent.ExecutorService;
89

910
import redis.clients.jedis.builders.ClusterClientBuilder;
1011
import redis.clients.jedis.executors.ClusterCommandExecutor;
@@ -190,12 +191,50 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha
190191
}
191192
// commands
192193

194+
/**
195+
* Creates a new pipeline for executing commands in a Redis Cluster.
196+
*
197+
* <p>Pipelining allows batching multiple commands for more efficient execution
198+
* by reducing network round-trips. In a cluster environment, commands are routed
199+
* to the appropriate nodes based on key hash slots.</p>
200+
*
201+
* <p>If the pipeline spans multiple nodes, a dedicated {@link ExecutorService} is
202+
* created internally to execute requests in parallel and shutdown when the pipeline
203+
* is synced.</p>
204+
*
205+
* @return a new {@link ClusterPipeline} instance
206+
* @see #pipelined(ExecutorService)
207+
*/
193208
@Override
194209
public ClusterPipeline pipelined() {
195-
return new ClusterPipeline((ClusterConnectionProvider) provider,
196-
(ClusterCommandObjects) commandObjects, commandFlagsRegistry);
210+
return pipelined(null);
197211
}
198212

213+
/**
214+
* Creates a new pipeline for executing commands in a Redis Cluster using the provided executor.
215+
*
216+
* <p>Pipelining allows batching multiple commands for more efficient execution
217+
* by reducing network round-trips. In a cluster environment, commands are routed
218+
* to the appropriate nodes based on key hash slots.</p>
219+
*
220+
* <p>If the pipeline spans multiple nodes, the provided {@link ExecutorService} is
221+
* used to execute requests in parallel. The caller is responsible for managing
222+
* the lifecycle of this executor (creation, shutdown, etc.).</p>
223+
*
224+
* <p>If {@code null} is provided, a dedicated executor is created and managed
225+
* internally, similar to {@link #pipelined()}.</p>
226+
*
227+
* @param executorService the executor to use for multi-node execution, or {@code null}
228+
* @return a new {@link ClusterPipeline} instance
229+
*/
230+
public ClusterPipeline pipelined(ExecutorService executorService) {
231+
return new ClusterPipeline(
232+
(ClusterConnectionProvider) provider,
233+
(ClusterCommandObjects) commandObjects,
234+
commandFlagsRegistry,
235+
executorService
236+
);
237+
}
199238
/**
200239
* @param doMulti param
201240
* @return nothing

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package redis.clients.jedis;
22

3+
import static java.util.concurrent.TimeUnit.SECONDS;
34
import static org.hamcrest.Matchers.contains;
45
import static org.hamcrest.MatcherAssert.assertThat;
56
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -13,6 +14,10 @@
1314
import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS;
1415

1516
import java.util.*;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.Future;
1621

1722
import org.hamcrest.MatcherAssert;
1823
import org.hamcrest.Matchers;
@@ -1254,4 +1259,144 @@ public void testMultiShardPolicyWithMultipleKeysRejected() {
12541259
"Error message should mention MULTI_SHARD policy or multiple slots");
12551260
}
12561261
}
1262+
1263+
1264+
@Test
1265+
public void sharedExecutorPipelineDoesNotShutdownSharedExecutor() {
1266+
ExecutorService executorService = Executors.newFixedThreadPool(3);
1267+
try ( RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()){
1268+
try (ClusterPipeline pipeline = cluster.pipelined(executorService)) {
1269+
// multiple keys at different slots, to ensure multi-node pipeline
1270+
pipeline.set("key1", "value1");
1271+
pipeline.set("key2", "value2");
1272+
pipeline.set("key3", "value3");
1273+
pipeline.sync();
1274+
}
1275+
} finally {
1276+
assertFalse(executorService.isShutdown());
1277+
executorService.shutdown();
1278+
}
1279+
}
1280+
@Test
1281+
public void sharedExecutorPipelineKeysAtSameNode() {
1282+
try (RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()) {
1283+
ExecutorService executorService = Executors.newFixedThreadPool(3);
1284+
// test simple key
1285+
cluster.set("foo", "bar");
1286+
1287+
try (ClusterPipeline pipeline = cluster.pipelined(executorService)) {
1288+
Response<String> foo = pipeline.get("foo");
1289+
pipeline.sync();
1290+
1291+
assertEquals("bar", foo.get());
1292+
}
1293+
1294+
// test multi key but at same node
1295+
int cnt = 3;
1296+
String prefix = "{foo}:";
1297+
for (int i = 0; i < cnt; i++) {
1298+
String key = prefix + i;
1299+
cluster.set(key, String.valueOf(i));
1300+
}
1301+
1302+
try (ClusterPipeline pipeline = cluster.pipelined(executorService)) {
1303+
List<Response<String>> results = new ArrayList<>();
1304+
for (int i = 0; i < cnt; i++) {
1305+
String key = prefix + i;
1306+
results.add(pipeline.get(key));
1307+
}
1308+
1309+
pipeline.sync();
1310+
int idx = 0;
1311+
for (Response<String> res : results) {
1312+
assertEquals(String.valueOf(idx), res.get());
1313+
idx++;
1314+
}
1315+
}
1316+
}
1317+
}
1318+
1319+
@Test
1320+
@Timeout(10)
1321+
public void sharedExecutorConcurrentPipelinesNoDeadlock() throws Exception {
1322+
// This test verifies that multiple pipelines using the same shared executor
1323+
// concurrently do not deadlock, even when the executor has fewer threads
1324+
// than concurrent pipelines
1325+
final int EXECUTOR_THREADS = 3;
1326+
final int CONCURRENT_PIPELINES = 10; // More pipelines than executor threads
1327+
final int OPERATIONS_PER_PIPELINE = 20;
1328+
1329+
ExecutorService sharedExecutor = Executors.newFixedThreadPool(EXECUTOR_THREADS);
1330+
ExecutorService testExecutor = Executors.newFixedThreadPool(CONCURRENT_PIPELINES);
1331+
1332+
try (RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()) {
1333+
1334+
CountDownLatch startLatch = new CountDownLatch(1);
1335+
CountDownLatch completionLatch = new CountDownLatch(CONCURRENT_PIPELINES);
1336+
List<Future<Integer>> futures = new ArrayList<>();
1337+
1338+
// Launch multiple pipelines concurrently
1339+
for (int pipelineId = 0; pipelineId < CONCURRENT_PIPELINES; pipelineId++) {
1340+
final int id = pipelineId;
1341+
Future<Integer> future = testExecutor.submit(() -> {
1342+
try {
1343+
// Wait for all threads to be ready before starting
1344+
startLatch.await();
1345+
1346+
int successCount = 0;
1347+
try (ClusterPipeline pipeline = cluster.pipelined(sharedExecutor)) {
1348+
List<Response<String>> responses = new ArrayList<>();
1349+
1350+
// Perform operations on different keys to ensure multi-node pipeline
1351+
for (int i = 0; i < OPERATIONS_PER_PIPELINE; i++) {
1352+
String key = "pipeline" + id + "_key" + i;
1353+
String value = "value" + i;
1354+
pipeline.set(key, value);
1355+
responses.add(pipeline.get(key));
1356+
}
1357+
1358+
// This sync() will use the shared executor
1359+
pipeline.sync();
1360+
1361+
// Verify all responses
1362+
for (int i = 0; i < OPERATIONS_PER_PIPELINE; i++) {
1363+
String expected = "value" + i;
1364+
String actual = responses.get(i).get();
1365+
if (expected.equals(actual)) {
1366+
successCount++;
1367+
}
1368+
}
1369+
}
1370+
return successCount;
1371+
} finally {
1372+
completionLatch.countDown();
1373+
}
1374+
});
1375+
futures.add(future);
1376+
}
1377+
1378+
// Release all threads to start concurrently
1379+
startLatch.countDown();
1380+
1381+
boolean completed = completionLatch.await(5, SECONDS);
1382+
assertTrue(completed, "All concurrent pipelines should complete without deadlock");
1383+
1384+
// Verify all operations succeeded
1385+
for (int i = 0; i < CONCURRENT_PIPELINES; i++) {
1386+
Integer successCount = futures.get(i).get();
1387+
assertEquals(OPERATIONS_PER_PIPELINE, successCount.intValue(),
1388+
"Pipeline " + i + " should have all operations succeed");
1389+
}
1390+
1391+
// Verify shared executor is still active
1392+
assertFalse(sharedExecutor.isShutdown(), "Shared executor should not be shut down");
1393+
1394+
} finally {
1395+
testExecutor.shutdown();
1396+
testExecutor.awaitTermination(5, SECONDS);
1397+
sharedExecutor.shutdown();
1398+
sharedExecutor.awaitTermination(5, SECONDS);
1399+
}
1400+
}
1401+
12571402
}

0 commit comments

Comments
 (0)