Skip to content

Commit 23f7def

Browse files
authored
[req/resp] Reject commands with ALL_NODES,ALL_SHARDS,MULTI_SHARD request policy in Cluster Pipeline (#4466)
* reject commands with ALL_NODES,ALL_SHARDS,MULTI_SHARD request policy in cluster pipeline * run test on topic branches * api docs fix * If the command has keys that route to a single slot, allow it
1 parent a05f1bc commit 23f7def

7 files changed

Lines changed: 225 additions & 7 deletions

File tree

.github/workflows/test-on-docker.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ on:
1313
- master
1414
- '[0-9].*'
1515
- 'feature/**'
16+
- 'topic/**'
1617
pull_request:
1718
branches:
1819
- master

src/main/java/redis/clients/jedis/ClusterPipeline.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,54 @@
77
import redis.clients.jedis.providers.ClusterConnectionProvider;
88
import redis.clients.jedis.util.IOUtils;
99

10+
/**
11+
* Pipeline implementation for Redis Cluster mode.
12+
* <p>
13+
* ClusterPipeline allows batching multiple commands for efficient execution in a Redis Cluster
14+
* environment. Commands are automatically routed to the appropriate cluster nodes based on
15+
* key hash slots.
16+
* </p>
17+
* <p>
18+
* <strong>Important Limitations:</strong>
19+
* </p>
20+
* <ul>
21+
* <li><strong>Single-node commands only:</strong> Only commands that can be routed to a single
22+
* node are supported. Commands requiring execution on multiple nodes (ALL_SHARDS, MULTI_SHARD,
23+
* ALL_NODES, or SPECIAL request policies) will throw {@link UnsupportedOperationException}.</li>
24+
* <li><strong>Examples of unsupported commands:</strong>
25+
* <ul>
26+
* <li>{@code KEYS} - requires execution on all master shards</li>
27+
* <li>{@code MGET} with keys in different slots - requires execution on multiple shards</li>
28+
* <li>{@code SCRIPT LOAD} - requires execution on all nodes</li>
29+
* </ul>
30+
* </li>
31+
* <li>For multi-node commands, use the non-pipelined mode
32+
* of {@link RedisClusterClient} instead.</li>
33+
* </ul>
34+
* <p>
35+
* <strong> Usage Pattern:</strong>
36+
* </p>
37+
* <pre>{@code
38+
* try (RedisCluster cluster = new RedisCluster(nodes, config)) {
39+
* // For single-node commands, use pipelined mode
40+
* try (ClusterPipeline pipeline = cluster.pipelined()) {
41+
* Response<String> r1 = pipeline.set("key1", "value1");
42+
* Response<String> r2 = pipeline.get("key1");
43+
* pipeline.sync();
44+
*
45+
* System.out.println(r1.get()); // "OK"
46+
* System.out.println(r2.get()); // "value1"
47+
* }
48+
*
49+
* // For multi-node commands, use non-pipelined mode
50+
* Set<String> allKeys = cluster.keys("*"); // Executes on all master shards
51+
* List<String> values = cluster.mget("key1", "key2", "key3"); // Cross-slot keys
52+
* }
53+
* }</pre>
54+
*
55+
* @see MultiNodePipelineBase
56+
* @see redis.clients.jedis.RedisClusterClient
57+
*/
1058
public class ClusterPipeline extends MultiNodePipelineBase {
1159

1260
private final ClusterConnectionProvider provider;
@@ -41,6 +89,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
4189
this.provider = provider;
4290
}
4391

92+
ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
93+
CommandFlagsRegistry commandFlagsRegistry) {
94+
super(commandObjects, commandFlagsRegistry);
95+
this.provider = provider;
96+
}
97+
4498
private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
4599
ClusterCommandObjects cco = new ClusterCommandObjects();
46100
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class JedisCluster extends UnifiedJedis {
4040
*/
4141
public static final int DEFAULT_MAX_ATTEMPTS = 5;
4242

43+
private final CommandFlagsRegistry commandFlagsRegistry;
44+
4345
/**
4446
* Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster.
4547
* <p>
@@ -291,11 +293,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
291293
// Uses a fetched connection to process protocol. Should be avoided if possible.
292294
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
293295
super(provider, maxAttempts, maxTotalRetriesDuration);
296+
this.commandFlagsRegistry = StaticCommandFlagsRegistry.registry();
294297
}
295298

296299
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
297300
RedisProtocol protocol) {
298301
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
302+
this.commandFlagsRegistry = StaticCommandFlagsRegistry.registry();
299303
}
300304

301305
@Experimental
@@ -343,10 +347,14 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
343347
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
344348
RedisProtocol protocol, Cache clientSideCache) {
345349
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
350+
this.commandFlagsRegistry = StaticCommandFlagsRegistry.registry();
346351
}
347352

348-
private JedisCluster(CommandExecutor commandExecutor, ConnectionProvider connectionProvider, CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache) {
353+
private JedisCluster(CommandExecutor commandExecutor, ConnectionProvider connectionProvider,
354+
CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache,
355+
CommandFlagsRegistry commandFlagsRegistry) {
349356
super(commandExecutor, connectionProvider, commandObjects, redisProtocol, cache);
357+
this.commandFlagsRegistry = commandFlagsRegistry;
350358
}
351359

352360
/**
@@ -359,8 +367,8 @@ static public class Builder extends ClusterClientBuilder<JedisCluster> {
359367

360368
@Override
361369
protected JedisCluster createClient() {
362-
return new JedisCluster(commandExecutor, connectionProvider, commandObjects, clientConfig.getRedisProtocol(),
363-
cache);
370+
return new JedisCluster(commandExecutor, connectionProvider, commandObjects,
371+
clientConfig.getRedisProtocol(), cache, getCommandFlags());
364372
}
365373
}
366374

@@ -414,7 +422,8 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha
414422

415423
@Override
416424
public ClusterPipeline pipelined() {
417-
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
425+
return new ClusterPipeline((ClusterConnectionProvider) provider,
426+
(ClusterCommandObjects) commandObjects, commandFlagsRegistry);
418427
}
419428

420429
/**

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.List;
77
import java.util.Map;
88
import java.util.Queue;
9+
import java.util.Set;
910
import java.util.concurrent.CountDownLatch;
1011
import java.util.concurrent.Executor;
1112
import java.util.concurrent.ExecutorService;
@@ -31,9 +32,15 @@ public abstract class MultiNodePipelineBase extends AbstractPipeline {
3132
private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
3233
private final Map<HostAndPort, Connection> connections;
3334
private volatile boolean syncing = false;
35+
protected final CommandFlagsRegistry commandFlagsRegistry;
3436

3537
public MultiNodePipelineBase(CommandObjects commandObjects) {
38+
this(commandObjects, StaticCommandFlagsRegistry.registry());
39+
}
40+
41+
protected MultiNodePipelineBase(CommandObjects commandObjects, CommandFlagsRegistry commandFlagsRegistry) {
3642
super(commandObjects);
43+
this.commandFlagsRegistry = commandFlagsRegistry;
3744
pipelinedResponses = new LinkedHashMap<>();
3845
connections = new LinkedHashMap<>();
3946
}
@@ -44,6 +51,9 @@ public MultiNodePipelineBase(CommandObjects commandObjects) {
4451

4552
@Override
4653
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
54+
// Validate that the command is supported in pipeline mode
55+
validatePipelineCommand(commandObject.getArguments());
56+
4757
HostAndPort nodeKey = getNodeKey(commandObject.getArguments());
4858

4959
Queue<Response<?>> queue;
@@ -140,6 +150,52 @@ public final void sync() {
140150
syncing = false;
141151
}
142152

153+
/**
154+
* Validates that a command can be executed in a multi-node pipeline.
155+
* <p>
156+
* Commands with multi-node request policies (ALL_SHARDS, MULTI_SHARD, ALL_NODES, SPECIAL)
157+
* are rejected UNLESS they have keys that route to a single slot, in which case they can
158+
* be executed on that single node.
159+
* </p>
160+
*
161+
* @param args the command arguments
162+
* @throws UnsupportedOperationException if the command requires multi-node execution
163+
*/
164+
private void validatePipelineCommand(CommandArguments args) {
165+
CommandFlagsRegistry.RequestPolicy policy =
166+
commandFlagsRegistry.getRequestPolicy(args);
167+
168+
// For multi-node policies, check if the command can be routed to a single slot
169+
switch (policy) {
170+
case ALL_SHARDS:
171+
case MULTI_SHARD:
172+
case ALL_NODES:
173+
case SPECIAL:
174+
// If the command has keys that route to a single slot, allow it
175+
Set<Integer> slots = args.getKeyHashSlots();
176+
if (slots.size() == 1) {
177+
// Command can be routed to a single slot - allow it
178+
return;
179+
}
180+
181+
// Command cannot be routed to a single slot - reject it
182+
String policyName = policy.name();
183+
throw new UnsupportedOperationException(
184+
"Command '" + args.getCommand() + "' with " + policyName + " request policy "
185+
+ "cannot be executed in pipeline mode because it cannot be routed to a single slot. "
186+
+ (slots.isEmpty()
187+
? "This command has no keys to determine routing. "
188+
: "This command's keys map to multiple slots (" + slots.size() + " slots). ")
189+
+ "Use non-pipeline cluster client for this command.");
190+
191+
case DEFAULT:
192+
default:
193+
// DEFAULT policy and unknown policies - allow standard command execution
194+
// Routes to single node based on key hash
195+
break;
196+
}
197+
}
198+
143199
@Deprecated
144200
public Response<Long> waitReplicas(int replicas, long timeout) {
145201
return appendCommand(commandObjects.waitReplicas(replicas, timeout));

src/main/java/redis/clients/jedis/RedisClusterClient.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,13 @@ public class RedisClusterClient extends UnifiedJedis {
5757
*/
5858
public static final int DEFAULT_MAX_ATTEMPTS = 5;
5959

60+
private final CommandFlagsRegistry commandFlagsRegistry;
61+
6062
private RedisClusterClient(CommandExecutor commandExecutor, ConnectionProvider connectionProvider,
61-
CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache) {
63+
CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache,
64+
CommandFlagsRegistry commandFlagsRegistry) {
6265
super(commandExecutor, connectionProvider, commandObjects, redisProtocol, cache);
66+
this.commandFlagsRegistry = commandFlagsRegistry;
6367
}
6468

6569
/**
@@ -133,7 +137,7 @@ public static class Builder extends ClusterClientBuilder<RedisClusterClient> {
133137
@Override
134138
protected RedisClusterClient createClient() {
135139
return new RedisClusterClient(commandExecutor, connectionProvider, commandObjects,
136-
clientConfig.getRedisProtocol(), cache);
140+
clientConfig.getRedisProtocol(), cache, getCommandFlags());
137141
}
138142
}
139143

@@ -189,7 +193,7 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha
189193
@Override
190194
public ClusterPipeline pipelined() {
191195
return new ClusterPipeline((ClusterConnectionProvider) provider,
192-
(ClusterCommandObjects) commandObjects);
196+
(ClusterCommandObjects) commandObjects, commandFlagsRegistry);
193197
}
194198

195199
/**

src/main/java/redis/clients/jedis/builders/ClusterClientBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,17 @@ public ClusterClientBuilder<C> commandFlags(CommandFlagsRegistry commandFlags) {
8888
return this;
8989
}
9090

91+
/**
92+
* Gets the command flags registry, initializing it if necessary.
93+
* @return the command flags registry
94+
*/
95+
protected CommandFlagsRegistry getCommandFlags() {
96+
if (this.commandFlags == null) {
97+
this.commandFlags = createDefaultCommandFlagsRegistry();
98+
}
99+
return this.commandFlags;
100+
}
101+
91102
@Override
92103
protected ClusterClientBuilder<C> self() {
93104
return this;

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,4 +1171,87 @@ private static void assertThreadsCount() {
11711171
.count();
11721172
MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20));
11731173
}
1174+
1175+
@Test
1176+
public void testAllShardsCommandRejected() {
1177+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1178+
// KEYS has ALL_SHARDS policy with pattern argument (not a key), should be rejected
1179+
UnsupportedOperationException ex = assertThrows(
1180+
UnsupportedOperationException.class,
1181+
() -> pipeline.keys("*"));
1182+
1183+
assertTrue(ex.getMessage().contains("ALL_SHARDS"),
1184+
"Error message should mention ALL_SHARDS policy");
1185+
assertTrue(ex.getMessage().contains("KEYS"),
1186+
"Error message should mention the command name");
1187+
assertTrue(ex.getMessage().contains("no keys"),
1188+
"Error message should mention that command has no keys");
1189+
}
1190+
}
1191+
1192+
@Test
1193+
public void testAllShardsPolicyWithSampleKeyAllowed() {
1194+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1195+
// SCRIPT EXISTS has ALL_SHARDS policy but with sampleKey it routes to single slot, should work
1196+
String dummySha1 = "0000000000000000000000000000000000000000";
1197+
Response<List<Boolean>> existsResponse = pipeline.scriptExists("samplekey", dummySha1);
1198+
pipeline.sync();
1199+
1200+
// The response should be valid (list with one boolean indicating if script exists)
1201+
assertNotNull(existsResponse.get(), "SCRIPT EXISTS with sampleKey should be allowed in pipeline");
1202+
assertEquals(1, existsResponse.get().size(), "SCRIPT EXISTS should return list with one element");
1203+
assertFalse(existsResponse.get().get(0), "Dummy script should not exist");
1204+
}
1205+
}
1206+
1207+
@Test
1208+
public void testMultiShardCommandRejected() {
1209+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1210+
// MGET with keys in different slots should be rejected
1211+
UnsupportedOperationException ex = assertThrows(
1212+
UnsupportedOperationException.class,
1213+
() -> pipeline.mget("key1", "key2", "key3"));
1214+
1215+
assertTrue(ex.getMessage().contains("MULTI_SHARD"),
1216+
"Error message should mention MULTI_SHARD policy");
1217+
}
1218+
}
1219+
1220+
@Test
1221+
public void testDefaultPolicyCommandAllowed() {
1222+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1223+
// SET with single key - DEFAULT policy, should work
1224+
Response<String> setResponse = pipeline.set("testkey", "testvalue");
1225+
Response<String> getResponse = pipeline.get("testkey");
1226+
pipeline.sync();
1227+
1228+
assertEquals("OK", setResponse.get());
1229+
assertEquals("testvalue", getResponse.get());
1230+
}
1231+
}
1232+
1233+
@Test
1234+
public void testMultiShardPolicyWithSingleKeyAllowed() {
1235+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1236+
// EXISTS with single key has MULTI_SHARD policy but routes to single slot, should work
1237+
pipeline.set("existskey", "value");
1238+
Response<Boolean> existsResponse = pipeline.exists("existskey");
1239+
pipeline.sync();
1240+
1241+
assertTrue(existsResponse.get(), "EXISTS with single key should be allowed in pipeline");
1242+
}
1243+
}
1244+
1245+
@Test
1246+
public void testMultiShardPolicyWithMultipleKeysRejected() {
1247+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1248+
// EXISTS with multiple keys in different slots has MULTI_SHARD policy, should be rejected
1249+
UnsupportedOperationException ex = assertThrows(
1250+
UnsupportedOperationException.class,
1251+
() -> pipeline.exists("key1", "key2", "key3"));
1252+
1253+
assertTrue(ex.getMessage().contains("MULTI_SHARD") || ex.getMessage().contains("multiple slots"),
1254+
"Error message should mention MULTI_SHARD policy or multiple slots");
1255+
}
1256+
}
11741257
}

0 commit comments

Comments
 (0)