|
8 | 8 | import java.util.Map; |
9 | 9 | import java.util.Queue; |
10 | 10 | import java.util.Set; |
| 11 | +import java.util.concurrent.CountDownLatch; |
| 12 | +import java.util.concurrent.ExecutorService; |
| 13 | +import java.util.concurrent.Executors; |
| 14 | + |
11 | 15 | import org.json.JSONArray; |
12 | 16 | import org.slf4j.Logger; |
13 | 17 | import org.slf4j.LoggerFactory; |
@@ -40,13 +44,22 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin |
40 | 44 |
|
41 | 45 | private final Logger log = LoggerFactory.getLogger(getClass()); |
42 | 46 |
|
| 47 | + /** |
| 48 | + * default number of processes for sync, if you got enough cores for client |
| 49 | + * or your cluster nodes more than 3 nodes, you may increase this workers number. |
| 50 | + * suggest <= cluster nodes |
| 51 | + */ |
| 52 | + public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; |
| 53 | + |
43 | 54 | private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses; |
44 | 55 | private final Map<HostAndPort, Connection> connections; |
45 | 56 | private volatile boolean syncing = false; |
46 | 57 |
|
47 | 58 | private final CommandObjects commandObjects; |
48 | 59 | private GraphCommandObjects graphCommandObjects; |
49 | 60 |
|
| 61 | + private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); |
| 62 | + |
50 | 63 | public MultiNodePipelineBase(CommandObjects commandObjects) { |
51 | 64 | pipelinedResponses = new LinkedHashMap<>(); |
52 | 65 | connections = new LinkedHashMap<>(); |
@@ -106,25 +119,36 @@ public final void sync() { |
106 | 119 | } |
107 | 120 | syncing = true; |
108 | 121 |
|
| 122 | + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); |
109 | 123 | Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator |
110 | 124 | = pipelinedResponses.entrySet().iterator(); |
111 | 125 | while (pipelinedResponsesIterator.hasNext()) { |
112 | 126 | Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next(); |
113 | 127 | HostAndPort nodeKey = entry.getKey(); |
114 | 128 | Queue<Response<?>> queue = entry.getValue(); |
115 | 129 | Connection connection = connections.get(nodeKey); |
116 | | - try { |
117 | | - List<Object> unformatted = connection.getMany(queue.size()); |
118 | | - for (Object o : unformatted) { |
119 | | - queue.poll().set(o); |
| 130 | + executorService.submit(() -> { |
| 131 | + try { |
| 132 | + List<Object> unformatted = connection.getMany(queue.size()); |
| 133 | + for (Object o : unformatted) { |
| 134 | + queue.poll().set(o); |
| 135 | + } |
| 136 | + } catch (JedisConnectionException jce) { |
| 137 | + log.error("Error with connection to " + nodeKey, jce); |
| 138 | + // cleanup the connection |
| 139 | + pipelinedResponsesIterator.remove(); |
| 140 | + connections.remove(nodeKey); |
| 141 | + IOUtils.closeQuietly(connection); |
| 142 | + } finally { |
| 143 | + countDownLatch.countDown(); |
120 | 144 | } |
121 | | - } catch (JedisConnectionException jce) { |
122 | | - log.error("Error with connection to " + nodeKey, jce); |
123 | | - // cleanup the connection |
124 | | - pipelinedResponsesIterator.remove(); |
125 | | - connections.remove(nodeKey); |
126 | | - IOUtils.closeQuietly(connection); |
127 | | - } |
| 145 | + }); |
| 146 | + } |
| 147 | + |
| 148 | + try { |
| 149 | + countDownLatch.await(); |
| 150 | + } catch (InterruptedException e) { |
| 151 | + log.error("Thread is interrupted during sync.", e); |
128 | 152 | } |
129 | 153 |
|
130 | 154 | syncing = false; |
|
0 commit comments