Skip to content

Commit 4551bda

Browse files
authored
Merge e9efd7d into 4a8b9e7
2 parents 4a8b9e7 + e9efd7d commit 4551bda

File tree

1 file changed

+35
-11
lines changed

1 file changed

+35
-11
lines changed

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
import java.util.Map;
99
import java.util.Queue;
1010
import java.util.Set;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
1115
import org.json.JSONArray;
1216
import org.slf4j.Logger;
1317
import org.slf4j.LoggerFactory;
@@ -40,13 +44,22 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin
4044

4145
private final Logger log = LoggerFactory.getLogger(getClass());
4246

47+
/**
48+
* default number of processes for sync, if you got enough cores for client
49+
* or your cluster nodes more than 3 nodes, you may increase this workers number.
50+
* suggest <= cluster nodes
51+
*/
52+
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;
53+
4354
private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
4455
private final Map<HostAndPort, Connection> connections;
4556
private volatile boolean syncing = false;
4657

4758
private final CommandObjects commandObjects;
4859
private GraphCommandObjects graphCommandObjects;
4960

61+
private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
62+
5063
public MultiNodePipelineBase(CommandObjects commandObjects) {
5164
pipelinedResponses = new LinkedHashMap<>();
5265
connections = new LinkedHashMap<>();
@@ -106,25 +119,36 @@ public final void sync() {
106119
}
107120
syncing = true;
108121

122+
CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
109123
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
110124
= pipelinedResponses.entrySet().iterator();
111125
while (pipelinedResponsesIterator.hasNext()) {
112126
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
113127
HostAndPort nodeKey = entry.getKey();
114128
Queue<Response<?>> queue = entry.getValue();
115129
Connection connection = connections.get(nodeKey);
116-
try {
117-
List<Object> unformatted = connection.getMany(queue.size());
118-
for (Object o : unformatted) {
119-
queue.poll().set(o);
130+
executorService.submit(() -> {
131+
try {
132+
List<Object> unformatted = connection.getMany(queue.size());
133+
for (Object o : unformatted) {
134+
queue.poll().set(o);
135+
}
136+
} catch (JedisConnectionException jce) {
137+
log.error("Error with connection to " + nodeKey, jce);
138+
// cleanup the connection
139+
pipelinedResponsesIterator.remove();
140+
connections.remove(nodeKey);
141+
IOUtils.closeQuietly(connection);
142+
} finally {
143+
countDownLatch.countDown();
120144
}
121-
} catch (JedisConnectionException jce) {
122-
log.error("Error with connection to " + nodeKey, jce);
123-
// cleanup the connection
124-
pipelinedResponsesIterator.remove();
125-
connections.remove(nodeKey);
126-
IOUtils.closeQuietly(connection);
127-
}
145+
});
146+
}
147+
148+
try {
149+
countDownLatch.await();
150+
} catch (InterruptedException e) {
151+
log.error("Thread is interrupted during sync.", e);
128152
}
129153

130154
syncing = false;

0 commit comments

Comments
 (0)