Skip to content

Commit d9e4b85

Browse files
committed
If the command has keys that route to a single slot, allow it
1 parent 2cba7ef commit d9e4b85

2 files changed

Lines changed: 64 additions & 23 deletions

File tree

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.List;
77
import java.util.Map;
88
import java.util.Queue;
9+
import java.util.Set;
910
import java.util.concurrent.CountDownLatch;
1011
import java.util.concurrent.Executor;
1112
import java.util.concurrent.ExecutorService;
@@ -151,8 +152,11 @@ public final void sync() {
151152

152153
/**
153154
* Validates that a command can be executed in a multi-node pipeline.
154-
* Commands with ALL_SHARDS, MULTI_SHARD, ALL_NODES, or SPECIAL request policies
155-
* require execution on multiple nodes and cannot be properly handled in pipelines.
155+
* <p>
156+
* Commands with multi-node request policies (ALL_SHARDS, MULTI_SHARD, ALL_NODES, SPECIAL)
157+
* are rejected UNLESS they have keys that route to a single slot, in which case they can
158+
* be executed on that single node.
159+
* </p>
156160
*
157161
* @param args the command arguments
158162
* @throws UnsupportedOperationException if the command requires multi-node execution
@@ -161,33 +165,27 @@ private void validatePipelineCommand(CommandArguments args) {
161165
CommandFlagsRegistry.RequestPolicy policy =
162166
commandFlagsRegistry.getRequestPolicy(args);
163167

168+
// For multi-node policies, check if the command can be routed to a single slot
164169
switch (policy) {
165170
case ALL_SHARDS:
166-
throw new UnsupportedOperationException(
167-
"Command '" + args.getCommand() + "' with ALL_SHARDS request policy "
168-
+ "cannot be executed in pipeline mode. This command requires execution on all "
169-
+ "master shards but pipelines route to a single node. "
170-
+ "Use non-pipeline cluster client for this command.");
171-
172171
case MULTI_SHARD:
173-
throw new UnsupportedOperationException(
174-
"Command '" + args.getCommand() + "' with MULTI_SHARD request policy "
175-
+ "cannot be executed in pipeline mode. This command requires execution on "
176-
+ "multiple shards but pipelines route to a single node. "
177-
+ "Use non-pipeline cluster client for this command.");
178-
179172
case ALL_NODES:
180-
throw new UnsupportedOperationException(
181-
"Command '" + args.getCommand() + "' with ALL_NODES request policy "
182-
+ "cannot be executed in pipeline mode. This command requires execution on all "
183-
+ "nodes (masters and replicas) but pipelines route to a single node. "
184-
+ "Use non-pipeline cluster client for this command.");
185-
186173
case SPECIAL:
174+
// If the command has keys that route to a single slot, allow it
175+
Set<Integer> slots = args.getKeyHashSlots();
176+
if (slots.size() == 1) {
177+
// Command can be routed to a single slot - allow it
178+
return;
179+
}
180+
181+
// Command cannot be routed to a single slot - reject it
182+
String policyName = policy.name();
187183
throw new UnsupportedOperationException(
188-
"Command '" + args.getCommand() + "' with SPECIAL request policy "
189-
+ "cannot be executed in pipeline mode. This command has non-trivial routing "
190-
+ "requirements that cannot be handled in pipelines. "
184+
"Command '" + args.getCommand() + "' with " + policyName + " request policy "
185+
+ "cannot be executed in pipeline mode because it cannot be routed to a single slot. "
186+
+ (slots.isEmpty()
187+
? "This command has no keys to determine routing. "
188+
: "This command's keys map to multiple slots (" + slots.size() + " slots). ")
191189
+ "Use non-pipeline cluster client for this command.");
192190

193191
case DEFAULT:

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,6 +1175,7 @@ private static void assertThreadsCount() {
11751175
@Test
11761176
public void testAllShardsCommandRejected() {
11771177
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1178+
// KEYS has ALL_SHARDS policy with pattern argument (not a key), should be rejected
11781179
UnsupportedOperationException ex = assertThrows(
11791180
UnsupportedOperationException.class,
11801181
() -> pipeline.keys("*"));
@@ -1183,6 +1184,23 @@ public void testAllShardsCommandRejected() {
11831184
"Error message should mention ALL_SHARDS policy");
11841185
assertTrue(ex.getMessage().contains("KEYS"),
11851186
"Error message should mention the command name");
1187+
assertTrue(ex.getMessage().contains("no keys"),
1188+
"Error message should mention that command has no keys");
1189+
}
1190+
}
1191+
1192+
@Test
1193+
public void testAllShardsPolicyWithSampleKeyAllowed() {
1194+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1195+
// SCRIPT EXISTS has ALL_SHARDS policy but with sampleKey it routes to single slot, should work
1196+
String dummySha1 = "0000000000000000000000000000000000000000";
1197+
Response<List<Boolean>> existsResponse = pipeline.scriptExists("samplekey", dummySha1);
1198+
pipeline.sync();
1199+
1200+
// The response should be valid (list with one boolean indicating if script exists)
1201+
assertNotNull(existsResponse.get(), "SCRIPT EXISTS with sampleKey should be allowed in pipeline");
1202+
assertEquals(1, existsResponse.get().size(), "SCRIPT EXISTS should return list with one element");
1203+
assertFalse(existsResponse.get().get(0), "Dummy script should not exist");
11861204
}
11871205
}
11881206

@@ -1211,4 +1229,29 @@ public void testDefaultPolicyCommandAllowed() {
12111229
assertEquals("testvalue", getResponse.get());
12121230
}
12131231
}
1232+
1233+
@Test
1234+
public void testMultiShardPolicyWithSingleKeyAllowed() {
1235+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1236+
// EXISTS with single key has MULTI_SHARD policy but routes to single slot, should work
1237+
pipeline.set("existskey", "value");
1238+
Response<Boolean> existsResponse = pipeline.exists("existskey");
1239+
pipeline.sync();
1240+
1241+
assertTrue(existsResponse.get(), "EXISTS with single key should be allowed in pipeline");
1242+
}
1243+
}
1244+
1245+
@Test
1246+
public void testMultiShardPolicyWithMultipleKeysRejected() {
1247+
try (ClusterPipeline pipeline = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
1248+
// EXISTS with multiple keys in different slots has MULTI_SHARD policy, should be rejected
1249+
UnsupportedOperationException ex = assertThrows(
1250+
UnsupportedOperationException.class,
1251+
() -> pipeline.exists("key1", "key2", "key3"));
1252+
1253+
assertTrue(ex.getMessage().contains("MULTI_SHARD") || ex.getMessage().contains("multiple slots"),
1254+
"Error message should mention MULTI_SHARD policy or multiple slots");
1255+
}
1256+
}
12141257
}

0 commit comments

Comments
 (0)