Skip to content

Commit d1a7ced

Browse files
authored
Merge branch 'master' into updateDoc
2 parents da269bb + 8ba734c commit d1a7ced

80 files changed

Lines changed: 7903 additions & 1055 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/integration.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ on:
1111
branches:
1212
- master
1313
- '[0-9].*'
14+
- 'topic/**'
1415
pull_request:
1516
branches:
1617
- master

.github/workflows/test-on-docker.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ on:
1313
- master
1414
- '[0-9].*'
1515
- 'feature/**'
16+
- 'topic/**'
1617
pull_request:
1718
branches:
1819
- master

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,8 @@
568568
<include>**/resps/LibraryInfoTest.java</include>
569569
<include>**/*Matchers.java</include>
570570
<include>**/*TestUtil.java</include>
571+
<include>**/executors/aggregators/*.java</include>
572+
<include>**/*MapMatcher.java</include>
571573
</includes>
572574
</configuration>
573575
<executions>

src/main/java/redis/clients/jedis/ClusterCommandArguments.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

src/main/java/redis/clients/jedis/ClusterCommandObjects.java

Lines changed: 545 additions & 43 deletions
Large diffs are not rendered by default.

src/main/java/redis/clients/jedis/ClusterPipeline.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,58 @@
33
import java.time.Duration;
44
import java.util.Set;
55
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
6+
import redis.clients.jedis.exceptions.JedisClusterOperationException;
67
import redis.clients.jedis.providers.ClusterConnectionProvider;
78
import redis.clients.jedis.util.IOUtils;
89

10+
/**
11+
* Pipeline implementation for Redis Cluster mode.
12+
* <p>
13+
* ClusterPipeline allows batching multiple commands for efficient execution in a Redis Cluster
14+
* environment. Commands are automatically routed to the appropriate cluster nodes based on
15+
* key hash slots.
16+
* </p>
17+
* <p>
18+
* <strong>Important Limitations:</strong>
19+
* </p>
20+
* <ul>
21+
* <li><strong>Single-node commands only:</strong> Only commands that can be routed to a single
22+
* node are supported. Commands requiring execution on multiple nodes (ALL_SHARDS, MULTI_SHARD,
23+
* ALL_NODES, or SPECIAL request policies) will throw {@link UnsupportedOperationException}.</li>
24+
* <li><strong>Examples of unsupported commands:</strong>
25+
* <ul>
26+
* <li>{@code KEYS} - requires execution on all master shards</li>
27+
* <li>{@code MGET} with keys in different slots - requires execution on multiple shards</li>
28+
* <li>{@code SCRIPT LOAD} - requires execution on all nodes</li>
29+
* </ul>
30+
* </li>
31+
* <li>For multi-node commands, use the non-pipelined mode
32+
* of {@link RedisClusterClient} instead.</li>
33+
* </ul>
34+
* <p>
35+
* <strong> Usage Pattern:</strong>
36+
* </p>
37+
* <pre>{@code
38+
* try (RedisCluster cluster = new RedisCluster(nodes, config)) {
39+
* // For single-node commands, use pipelined mode
40+
* try (ClusterPipeline pipeline = cluster.pipelined()) {
41+
* Response<String> r1 = pipeline.set("key1", "value1");
42+
* Response<String> r2 = pipeline.get("key1");
43+
* pipeline.sync();
44+
*
45+
* System.out.println(r1.get()); // "OK"
46+
* System.out.println(r2.get()); // "value1"
47+
* }
48+
*
49+
* // For multi-node commands, use non-pipelined mode
50+
* Set<String> allKeys = cluster.keys("*"); // Executes on all master shards
51+
* List<String> values = cluster.mget("key1", "key2", "key3"); // Cross-slot keys
52+
* }
53+
* }</pre>
54+
*
55+
* @see MultiNodePipelineBase
56+
* @see redis.clients.jedis.RedisClusterClient
57+
*/
958
public class ClusterPipeline extends MultiNodePipelineBase {
1059

1160
private final ClusterConnectionProvider provider;
@@ -40,6 +89,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
4089
this.provider = provider;
4190
}
4291

92+
ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
93+
CommandFlagsRegistry commandFlagsRegistry) {
94+
super(commandObjects, commandFlagsRegistry);
95+
this.provider = provider;
96+
}
97+
4398
private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
4499
ClusterCommandObjects cco = new ClusterCommandObjects();
45100
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);
@@ -57,7 +112,17 @@ public void close() {
57112

58113
@Override
59114
protected HostAndPort getNodeKey(CommandArguments args) {
60-
return provider.getNode(((ClusterCommandArguments) args).getCommandHashSlot());
115+
Set<Integer> slots = args.getKeyHashSlots();
116+
117+
if (slots.size() > 1) {
118+
throw new JedisClusterOperationException("Cannot get NodeKey for command with multiple hash slots");
119+
}
120+
121+
if (slots.isEmpty()) {
122+
return null; // Let getConnection(null) handle it by using a random node
123+
}
124+
125+
return provider.getNode(slots.iterator().next());
61126
}
62127

63128
@Override

src/main/java/redis/clients/jedis/CommandArguments.java

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package redis.clients.jedis;
22

3-
import java.util.ArrayList;
4-
import java.util.Arrays;
5-
import java.util.Collection;
6-
import java.util.Collections;
7-
import java.util.Iterator;
8-
import java.util.List;
3+
import java.util.*;
94

105
import redis.clients.jedis.annots.Experimental;
116
import redis.clients.jedis.annots.Internal;
@@ -14,13 +9,30 @@
149
import redis.clients.jedis.commands.ProtocolCommand;
1510
import redis.clients.jedis.params.IParams;
1611
import redis.clients.jedis.search.RediSearchUtil;
12+
import redis.clients.jedis.util.JedisClusterCRC16;
1713

1814
public class CommandArguments implements Iterable<Rawable> {
1915

16+
/**
17+
* Default initial capacity for the keys list. Most Redis commands have 1-3 keys,
18+
* so a small initial capacity avoids reallocations for common cases.
19+
*/
20+
private static final int DEFAULT_KEYS_CAPACITY = 4;
21+
2022
private CommandKeyArgumentPreProcessor keyPreProc = null;
2123
private final ArrayList<Rawable> args;
2224

23-
private List<Object> keys;
25+
/**
26+
* Pre-allocated list for storing keys. Using ArrayList directly avoids the
27+
* memory reallocation overhead of transitioning from emptyList -> singletonList -> ArrayList.
28+
*/
29+
private final ArrayList<Object> keys;
30+
31+
/**
32+
* Cached hash slots computed from keys. Null indicates the cache is invalid
33+
* and needs to be recomputed. The cache is invalidated when keys are added.
34+
*/
35+
private Set<Integer> cachedHashSlots;
2436

2537
private boolean blocking;
2638

@@ -32,7 +44,8 @@ public CommandArguments(ProtocolCommand command) {
3244
args = new ArrayList<>();
3345
args.add(command);
3446

35-
keys = Collections.emptyList();
47+
keys = new ArrayList<>(DEFAULT_KEYS_CAPACITY);
48+
cachedHashSlots = null;
3649
}
3750

3851
public ProtocolCommand getCommand() {
@@ -120,36 +133,36 @@ public CommandArguments key(Object key) {
120133

121134
if (key instanceof Rawable) {
122135
Rawable raw = (Rawable) key;
123-
processKey(raw.getRaw());
124136
args.add(raw);
137+
// Extract raw bytes for hash slot computation to avoid ClassCastException in getKeyHashSlots()
138+
addHashSlotKey(raw.getRaw());
125139
} else if (key instanceof byte[]) {
126140
byte[] raw = (byte[]) key;
127-
processKey(raw);
128141
args.add(RawableFactory.from(raw));
142+
addHashSlotKey(raw);
129143
} else if (key instanceof String) {
130144
String raw = (String) key;
131-
processKey(raw);
132145
args.add(RawableFactory.from(raw));
146+
addHashSlotKey(raw);
133147
} else {
134148
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
135149
}
136150

137-
addKeyInKeys(key);
151+
return this;
152+
}
138153

154+
final CommandArguments addHashSlotKey(String key) {
155+
keys.add(key);
156+
// Invalidate cached hash slots since keys have changed
157+
cachedHashSlots = null;
139158
return this;
140159
}
141160

142-
private void addKeyInKeys(Object key) {
143-
if (keys.isEmpty()) {
144-
keys = Collections.singletonList(key);
145-
} else if (keys.size() == 1) {
146-
List oldKeys = keys;
147-
keys = new ArrayList();
148-
keys.addAll(oldKeys);
149-
keys.add(key);
150-
} else {
151-
keys.add(key);
152-
}
161+
final CommandArguments addHashSlotKey(byte[] key) {
162+
keys.add(key);
163+
// Invalidate cached hash slots since keys have changed
164+
cachedHashSlots = null;
165+
return this;
153166
}
154167

155168
public final CommandArguments keys(Object... keys) {
@@ -167,26 +180,16 @@ public final CommandArguments addParams(IParams params) {
167180
return this;
168181
}
169182

170-
protected CommandArguments processKey(byte[] key) {
171-
// do nothing
172-
return this;
173-
}
174-
175-
protected final CommandArguments processKeys(byte[]... keys) {
183+
protected final CommandArguments addHashSlotKeys(byte[]... keys) {
176184
for (byte[] key : keys) {
177-
processKey(key);
185+
addHashSlotKey(key);
178186
}
179187
return this;
180188
}
181189

182-
protected CommandArguments processKey(String key) {
183-
// do nothing
184-
return this;
185-
}
186-
187-
protected final CommandArguments processKeys(String... keys) {
190+
protected final CommandArguments addHashSlotKeys(String... keys) {
188191
for (String key : keys) {
189-
processKey(key);
192+
addHashSlotKey(key);
190193
}
191194
return this;
192195
}
@@ -210,9 +213,57 @@ public Iterator<Rawable> iterator() {
210213
return args.iterator();
211214
}
212215

216+
/**
217+
* Returns the keys used in this command.
218+
* <p>
219+
* <b>Internal API:</b> This method is internal and should not be used by external code.
220+
* It is exposed for internal use by caching ({@link redis.clients.jedis.csc.CacheKey#getRedisKeys()})
221+
* and cluster operations.
222+
* <p>
223+
* <b>Supported types:</b> Keys are stored as either {@link String} or {@code byte[]} depending on
224+
* how they were added via {@link #key(Object)} or {@link #addHashSlotKey(String)}/{@link #addHashSlotKey(byte[])}.
225+
* Only {@link String} and {@code byte[]} are guaranteed to be supported by downstream consumers.
226+
* <p>
227+
* <b>Type safety:</b> Consumers must handle both {@link String} and {@code byte[]} types.
228+
* Passing other types may cause {@link IllegalArgumentException} when used with caching
229+
* (see {@link redis.clients.jedis.csc.AbstractCache#makeKeyForRedisKeysToCacheKeys(Object)})
230+
* or cluster operations.
231+
* <p>
232+
* The returned list is unmodifiable to prevent external modification of the internal key tracking.
233+
*
234+
* @return unmodifiable list of keys ({@link String} or {@code byte[]})
235+
*/
213236
@Internal
214237
public List<Object> getKeys() {
215-
return keys;
238+
return Collections.unmodifiableList(keys);
239+
}
240+
241+
@Internal
242+
public Set<Integer> getKeyHashSlots() {
243+
// Return cached slots if available (cache is invalidated when keys are added)
244+
if (cachedHashSlots != null) {
245+
return cachedHashSlots;
246+
}
247+
248+
// Compute hash slots and cache the result
249+
Set<Integer> slots = new HashSet<>();
250+
for (Object key : keys) {
251+
if (key instanceof byte[]) {
252+
slots.add(JedisClusterCRC16.getSlot((byte[]) key));
253+
} else {
254+
slots.add(JedisClusterCRC16.getSlot((String) key));
255+
}
256+
}
257+
// Cache as unmodifiable set to prevent external modification
258+
cachedHashSlots = Collections.unmodifiableSet(slots);
259+
return cachedHashSlots;
260+
}
261+
262+
/**
263+
* @return true if this command has no keys, false otherwise
264+
*/
265+
public boolean isKeyless() {
266+
return keys.isEmpty();
216267
}
217268

218269
public boolean isBlocking() {

0 commit comments

Comments
 (0)