|
1 | 1 | package redis.clients.jedis; |
2 | 2 |
|
| 3 | +import static java.util.concurrent.TimeUnit.SECONDS; |
3 | 4 | import static org.hamcrest.Matchers.contains; |
4 | 5 | import static org.hamcrest.MatcherAssert.assertThat; |
5 | 6 | import static org.junit.jupiter.api.Assertions.assertArrayEquals; |
|
13 | 14 | import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS; |
14 | 15 |
|
15 | 16 | import java.util.*; |
| 17 | +import java.util.concurrent.CountDownLatch; |
| 18 | +import java.util.concurrent.ExecutorService; |
| 19 | +import java.util.concurrent.Executors; |
| 20 | +import java.util.concurrent.Future; |
16 | 21 |
|
17 | 22 | import org.hamcrest.MatcherAssert; |
18 | 23 | import org.hamcrest.Matchers; |
@@ -1171,4 +1176,142 @@ private static void assertThreadsCount() { |
1171 | 1176 | .count(); |
1172 | 1177 | MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20)); |
1173 | 1178 | } |
| 1179 | + |
| 1180 | + @Test |
| 1181 | + public void sharedExecutorPipelineDoesNotShutdownSharedExecutor() { |
| 1182 | + ExecutorService executorService = Executors.newFixedThreadPool(3); |
| 1183 | + try ( RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()){ |
| 1184 | + try (ClusterPipeline pipeline = cluster.pipelined(executorService)) { |
| 1185 | + // multiple keys at different slots, to ensure multi-node pipeline |
| 1186 | + pipeline.set("key1", "value1"); |
| 1187 | + pipeline.set("key2", "value2"); |
| 1188 | + pipeline.set("key3", "value3"); |
| 1189 | + pipeline.sync(); |
| 1190 | + } |
| 1191 | + } finally { |
| 1192 | + assertFalse(executorService.isShutdown()); |
| 1193 | + executorService.shutdown(); |
| 1194 | + } |
| 1195 | + } |
| 1196 | + @Test |
| 1197 | + public void sharedExecutorPipelineKeysAtSameNode() { |
| 1198 | + try (RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()) { |
| 1199 | + ExecutorService executorService = Executors.newFixedThreadPool(3); |
| 1200 | + // test simple key |
| 1201 | + cluster.set("foo", "bar"); |
| 1202 | + |
| 1203 | + try (ClusterPipeline pipeline = cluster.pipelined(executorService)) { |
| 1204 | + Response<String> foo = pipeline.get("foo"); |
| 1205 | + pipeline.sync(); |
| 1206 | + |
| 1207 | + assertEquals("bar", foo.get()); |
| 1208 | + } |
| 1209 | + |
| 1210 | + // test multi key but at same node |
| 1211 | + int cnt = 3; |
| 1212 | + String prefix = "{foo}:"; |
| 1213 | + for (int i = 0; i < cnt; i++) { |
| 1214 | + String key = prefix + i; |
| 1215 | + cluster.set(key, String.valueOf(i)); |
| 1216 | + } |
| 1217 | + |
| 1218 | + try (ClusterPipeline pipeline = cluster.pipelined(executorService)) { |
| 1219 | + List<Response<String>> results = new ArrayList<>(); |
| 1220 | + for (int i = 0; i < cnt; i++) { |
| 1221 | + String key = prefix + i; |
| 1222 | + results.add(pipeline.get(key)); |
| 1223 | + } |
| 1224 | + |
| 1225 | + pipeline.sync(); |
| 1226 | + int idx = 0; |
| 1227 | + for (Response<String> res : results) { |
| 1228 | + assertEquals(String.valueOf(idx), res.get()); |
| 1229 | + idx++; |
| 1230 | + } |
| 1231 | + } |
| 1232 | + } |
| 1233 | + } |
| 1234 | + |
| 1235 | + @Test |
| 1236 | + @Timeout(10) |
| 1237 | + public void sharedExecutorConcurrentPipelinesNoDeadlock() throws Exception { |
| 1238 | + // This test verifies that multiple pipelines using the same shared executor |
| 1239 | + // concurrently do not deadlock, even when the executor has fewer threads |
| 1240 | + // than concurrent pipelines |
| 1241 | + final int EXECUTOR_THREADS = 3; |
| 1242 | + final int CONCURRENT_PIPELINES = 10; // More pipelines than executor threads |
| 1243 | + final int OPERATIONS_PER_PIPELINE = 20; |
| 1244 | + |
| 1245 | + ExecutorService sharedExecutor = Executors.newFixedThreadPool(EXECUTOR_THREADS); |
| 1246 | + ExecutorService testExecutor = Executors.newFixedThreadPool(CONCURRENT_PIPELINES); |
| 1247 | + |
| 1248 | + try (RedisClusterClient cluster = RedisClusterClient.builder().nodes(nodes).clientConfig(DEFAULT_CLIENT_CONFIG).build()) { |
| 1249 | + |
| 1250 | + CountDownLatch startLatch = new CountDownLatch(1); |
| 1251 | + CountDownLatch completionLatch = new CountDownLatch(CONCURRENT_PIPELINES); |
| 1252 | + List<Future<Integer>> futures = new ArrayList<>(); |
| 1253 | + |
| 1254 | + // Launch multiple pipelines concurrently |
| 1255 | + for (int pipelineId = 0; pipelineId < CONCURRENT_PIPELINES; pipelineId++) { |
| 1256 | + final int id = pipelineId; |
| 1257 | + Future<Integer> future = testExecutor.submit(() -> { |
| 1258 | + try { |
| 1259 | + // Wait for all threads to be ready before starting |
| 1260 | + startLatch.await(); |
| 1261 | + |
| 1262 | + int successCount = 0; |
| 1263 | + try (ClusterPipeline pipeline = cluster.pipelined(sharedExecutor)) { |
| 1264 | + List<Response<String>> responses = new ArrayList<>(); |
| 1265 | + |
| 1266 | + // Perform operations on different keys to ensure multi-node pipeline |
| 1267 | + for (int i = 0; i < OPERATIONS_PER_PIPELINE; i++) { |
| 1268 | + String key = "pipeline" + id + "_key" + i; |
| 1269 | + String value = "value" + i; |
| 1270 | + pipeline.set(key, value); |
| 1271 | + responses.add(pipeline.get(key)); |
| 1272 | + } |
| 1273 | + |
| 1274 | + // This sync() will use the shared executor |
| 1275 | + pipeline.sync(); |
| 1276 | + |
| 1277 | + // Verify all responses |
| 1278 | + for (int i = 0; i < OPERATIONS_PER_PIPELINE; i++) { |
| 1279 | + String expected = "value" + i; |
| 1280 | + String actual = responses.get(i).get(); |
| 1281 | + if (expected.equals(actual)) { |
| 1282 | + successCount++; |
| 1283 | + } |
| 1284 | + } |
| 1285 | + } |
| 1286 | + return successCount; |
| 1287 | + } finally { |
| 1288 | + completionLatch.countDown(); |
| 1289 | + } |
| 1290 | + }); |
| 1291 | + futures.add(future); |
| 1292 | + } |
| 1293 | + |
| 1294 | + // Release all threads to start concurrently |
| 1295 | + startLatch.countDown(); |
| 1296 | + |
| 1297 | + boolean completed = completionLatch.await(5, SECONDS); |
| 1298 | + assertTrue(completed, "All concurrent pipelines should complete without deadlock"); |
| 1299 | + |
| 1300 | + // Verify all operations succeeded |
| 1301 | + for (int i = 0; i < CONCURRENT_PIPELINES; i++) { |
| 1302 | + Integer successCount = futures.get(i).get(); |
| 1303 | + assertEquals(OPERATIONS_PER_PIPELINE, successCount.intValue(), |
| 1304 | + "Pipeline " + i + " should have all operations succeed"); |
| 1305 | + } |
| 1306 | + |
| 1307 | + // Verify shared executor is still active |
| 1308 | + assertFalse(sharedExecutor.isShutdown(), "Shared executor should not be shut down"); |
| 1309 | + |
| 1310 | + } finally { |
| 1311 | + testExecutor.shutdown(); |
| 1312 | + testExecutor.awaitTermination(5, SECONDS); |
| 1313 | + sharedExecutor.shutdown(); |
| 1314 | + sharedExecutor.awaitTermination(5, SECONDS); |
| 1315 | + } |
| 1316 | + } |
1174 | 1317 | } |
0 commit comments