66import java .util .List ;
77import java .util .Map ;
88import java .util .Queue ;
9+ import java .util .Set ;
910import java .util .concurrent .CountDownLatch ;
1011import java .util .concurrent .Executor ;
1112import java .util .concurrent .ExecutorService ;
@@ -151,8 +152,11 @@ public final void sync() {
151152
152153 /**
153154 * Validates that a command can be executed in a multi-node pipeline.
154- * Commands with ALL_SHARDS, MULTI_SHARD, ALL_NODES, or SPECIAL request policies
155- * require execution on multiple nodes and cannot be properly handled in pipelines.
155+ * <p>
156+ * Commands with multi-node request policies (ALL_SHARDS, MULTI_SHARD, ALL_NODES, SPECIAL)
157+ * are rejected UNLESS they have keys that route to a single slot, in which case they can
158+ * be executed on that single node.
159+ * </p>
156160 *
157161 * @param args the command arguments
158162 * @throws UnsupportedOperationException if the command requires multi-node execution
@@ -161,33 +165,27 @@ private void validatePipelineCommand(CommandArguments args) {
161165 CommandFlagsRegistry .RequestPolicy policy =
162166 commandFlagsRegistry .getRequestPolicy (args );
163167
168+ // For multi-node policies, check if the command can be routed to a single slot
164169 switch (policy ) {
165170 case ALL_SHARDS :
166- throw new UnsupportedOperationException (
167- "Command '" + args .getCommand () + "' with ALL_SHARDS request policy "
168- + "cannot be executed in pipeline mode. This command requires execution on all "
169- + "master shards but pipelines route to a single node. "
170- + "Use non-pipeline cluster client for this command." );
171-
172171 case MULTI_SHARD :
173- throw new UnsupportedOperationException (
174- "Command '" + args .getCommand () + "' with MULTI_SHARD request policy "
175- + "cannot be executed in pipeline mode. This command requires execution on "
176- + "multiple shards but pipelines route to a single node. "
177- + "Use non-pipeline cluster client for this command." );
178-
179172 case ALL_NODES :
180- throw new UnsupportedOperationException (
181- "Command '" + args .getCommand () + "' with ALL_NODES request policy "
182- + "cannot be executed in pipeline mode. This command requires execution on all "
183- + "nodes (masters and replicas) but pipelines route to a single node. "
184- + "Use non-pipeline cluster client for this command." );
185-
186173 case SPECIAL :
174+ // If the command has keys that route to a single slot, allow it
175+ Set <Integer > slots = args .getKeyHashSlots ();
176+ if (slots .size () == 1 ) {
177+ // Command can be routed to a single slot - allow it
178+ return ;
179+ }
180+
181+ // Command cannot be routed to a single slot - reject it
182+ String policyName = policy .name ();
187183 throw new UnsupportedOperationException (
188- "Command '" + args .getCommand () + "' with SPECIAL request policy "
189- + "cannot be executed in pipeline mode. This command has non-trivial routing "
190- + "requirements that cannot be handled in pipelines. "
184+ "Command '" + args .getCommand () + "' with " + policyName + " request policy "
185+ + "cannot be executed in pipeline mode because it cannot be routed to a single slot. "
186+ + (slots .isEmpty ()
187+ ? "This command has no keys to determine routing. "
188+ : "This command's keys map to multiple slots (" + slots .size () + " slots). " )
191189 + "Use non-pipeline cluster client for this command." );
192190
193191 case DEFAULT :
0 commit comments