Skip to content

Commit 2e18da2

Browse files
authored
Merge branch 'master' into patch-3
2 parents 832963d + f9c69db commit 2e18da2

File tree

3 files changed

+143
-4
lines changed

3 files changed

+143
-4
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Apollo 1.9.0
6060
* [Fix possiable NPE](https://github.com/ctripcorp/apollo/pull/3832)
6161
* [Reduce bootstrap time in the situation with large properties](https://github.com/ctripcorp/apollo/pull/3816)
6262
* [docs: English catalog in sidebar](https://github.com/ctripcorp/apollo/pull/3831)
63+
* [fix the issue that release messages might be missed in certain scenarios](https://github.com/ctripcorp/apollo/pull/3819)
6364
------------------
6465
All issues and pull requests are [here](https://github.com/ctripcorp/apollo/milestone/6?closed=1)
6566

apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScanner.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
*/
1717
package com.ctrip.framework.apollo.biz.message;
1818

19+
import com.google.common.collect.Maps;
20+
import java.util.Iterator;
1921
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Map.Entry;
24+
import java.util.Set;
2025
import java.util.concurrent.Executors;
2126
import java.util.concurrent.ScheduledExecutorService;
2227
import java.util.concurrent.TimeUnit;
@@ -40,19 +45,22 @@
4045
*/
4146
public class ReleaseMessageScanner implements InitializingBean {
4247
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
48+
private static final int missingReleaseMessageMaxAge = 10; // hardcoded to 10, could be configured via BizConfig if necessary
4349
@Autowired
4450
private BizConfig bizConfig;
4551
@Autowired
4652
private ReleaseMessageRepository releaseMessageRepository;
4753
private int databaseScanInterval;
48-
private List<ReleaseMessageListener> listeners;
49-
private ScheduledExecutorService executorService;
54+
private final List<ReleaseMessageListener> listeners;
55+
private final ScheduledExecutorService executorService;
56+
private final Map<Long, Integer> missingReleaseMessages; // missing release message id => age counter
5057
private long maxIdScanned;
5158

5259
public ReleaseMessageScanner() {
5360
listeners = Lists.newCopyOnWriteArrayList();
5461
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
5562
.create("ReleaseMessageScanner", true));
63+
missingReleaseMessages = Maps.newHashMap();
5664
}
5765

5866
@Override
@@ -62,6 +70,7 @@ public void afterPropertiesSet() throws Exception {
6270
executorService.scheduleWithFixedDelay(() -> {
6371
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
6472
try {
73+
scanMissingMessages();
6574
scanMessages();
6675
transaction.setStatus(Transaction.SUCCESS);
6776
} catch (Throwable ex) {
@@ -108,10 +117,51 @@ private boolean scanAndSendMessages() {
108117
}
109118
fireMessageScanned(releaseMessages);
110119
int messageScanned = releaseMessages.size();
111-
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
120+
long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
121+
// check id gaps, possible reasons are release message not committed yet or already rolled back
122+
if (newMaxIdScanned - maxIdScanned > messageScanned) {
123+
recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
124+
}
125+
maxIdScanned = newMaxIdScanned;
112126
return messageScanned == 500;
113127
}
114128

129+
private void scanMissingMessages() {
130+
Set<Long> missingReleaseMessageIds = missingReleaseMessages.keySet();
131+
Iterable<ReleaseMessage> releaseMessages = releaseMessageRepository
132+
.findAllById(missingReleaseMessageIds);
133+
fireMessageScanned(releaseMessages);
134+
releaseMessages.forEach(releaseMessage -> {
135+
missingReleaseMessageIds.remove(releaseMessage.getId());
136+
});
137+
growAndCleanMissingMessages();
138+
}
139+
140+
private void growAndCleanMissingMessages() {
141+
Iterator<Entry<Long, Integer>> iterator = missingReleaseMessages.entrySet()
142+
.iterator();
143+
while (iterator.hasNext()) {
144+
Entry<Long, Integer> entry = iterator.next();
145+
if (entry.getValue() > missingReleaseMessageMaxAge) {
146+
iterator.remove();
147+
} else {
148+
entry.setValue(entry.getValue() + 1);
149+
}
150+
}
151+
}
152+
153+
private void recordMissingReleaseMessageIds(List<ReleaseMessage> messages, long startId) {
154+
for (ReleaseMessage message : messages) {
155+
long currentId = message.getId();
156+
if (currentId - startId > 1) {
157+
for (long i = startId + 1; i < currentId; i++) {
158+
missingReleaseMessages.putIfAbsent(i, 1);
159+
}
160+
}
161+
startId = currentId;
162+
}
163+
}
164+
115165
/**
116166
* find largest message id as the current start point
117167
* @return current largest message id
@@ -125,7 +175,7 @@ private long loadLargestMessageId() {
125175
* Notify listeners with messages loaded
126176
* @param messages
127177
*/
128-
private void fireMessageScanned(List<ReleaseMessage> messages) {
178+
private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
129179
for (ReleaseMessage message : messages) {
130180
for (ReleaseMessageListener listener : listeners) {
131181
try {

apollo-biz/src/test/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScannerTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,25 @@
1818

1919
import com.ctrip.framework.apollo.biz.config.BizConfig;
2020
import com.google.common.collect.Lists;
21+
import com.google.common.collect.Sets;
2122
import com.google.common.util.concurrent.SettableFuture;
2223

2324
import com.ctrip.framework.apollo.biz.AbstractUnitTest;
2425
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
2526
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
2627

28+
import java.util.ArrayList;
29+
import org.awaitility.Awaitility;
2730
import org.junit.Before;
2831
import org.junit.Test;
2932
import org.mockito.Mock;
3033
import org.springframework.test.util.ReflectionTestUtils;
3134

3235
import java.util.concurrent.TimeUnit;
3336

37+
import static org.awaitility.Awaitility.await;
3438
import static org.junit.Assert.assertEquals;
39+
import static org.junit.Assert.assertSame;
3540
import static org.mockito.Mockito.when;
3641

3742
/**
@@ -54,6 +59,10 @@ public void setUp() throws Exception {
5459
databaseScanInterval = 100; //100 ms
5560
when(bizConfig.releaseMessageScanIntervalInMilli()).thenReturn(databaseScanInterval);
5661
releaseMessageScanner.afterPropertiesSet();
62+
63+
Awaitility.reset();
64+
Awaitility.setDefaultTimeout(databaseScanInterval * 5, TimeUnit.MILLISECONDS);
65+
Awaitility.setDefaultPollInterval(databaseScanInterval, TimeUnit.MILLISECONDS);
5766
}
5867

5968
@Test
@@ -91,7 +100,86 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {
91100

92101
assertEquals(anotherMessage, anotherListenerMessage.getMessage());
93102
assertEquals(anotherId, anotherListenerMessage.getId());
103+
}
104+
105+
@Test
106+
public void testScanMessageWithGapAndNotifyMessageListener() throws Exception {
107+
String someMessage = "someMessage";
108+
long someId = 1;
109+
ReleaseMessage someReleaseMessage = assembleReleaseMessage(someId, someMessage);
110+
111+
String someMissingMessage = "someMissingMessage";
112+
long someMissingId = 2;
113+
ReleaseMessage someMissingReleaseMessage = assembleReleaseMessage(someMissingId, someMissingMessage);
114+
115+
String anotherMessage = "anotherMessage";
116+
long anotherId = 3;
117+
ReleaseMessage anotherReleaseMessage = assembleReleaseMessage(anotherId, anotherMessage);
118+
119+
String anotherMissingMessage = "anotherMissingMessage";
120+
long anotherMissingId = 4;
121+
ReleaseMessage anotherMissingReleaseMessage = assembleReleaseMessage(anotherMissingId, anotherMissingMessage);
122+
123+
long someRolledBackId = 5;
124+
125+
String yetAnotherMessage = "yetAnotherMessage";
126+
long yetAnotherId = 6;
127+
ReleaseMessage yetAnotherReleaseMessage = assembleReleaseMessage(yetAnotherId, yetAnotherMessage);
128+
129+
ArrayList<ReleaseMessage> receivedMessage = Lists.newArrayList();
130+
SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
131+
ReleaseMessageListener someListener = (message, channel) -> receivedMessage.add(message);
132+
releaseMessageScanner.addMessageListener(someListener);
133+
134+
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
135+
Lists.newArrayList(someReleaseMessage));
136+
137+
await().untilAsserted(() -> {
138+
assertEquals(1, receivedMessage.size());
139+
assertSame(someReleaseMessage, receivedMessage.get(0));
140+
});
141+
142+
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
143+
Lists.newArrayList(anotherReleaseMessage));
94144

145+
await().untilAsserted(() -> {
146+
assertEquals(2, receivedMessage.size());
147+
assertSame(someReleaseMessage, receivedMessage.get(0));
148+
assertSame(anotherReleaseMessage, receivedMessage.get(1));
149+
});
150+
151+
when(releaseMessageRepository.findAllById(Sets.newHashSet(someMissingId)))
152+
.thenReturn(Lists.newArrayList(someMissingReleaseMessage));
153+
154+
await().untilAsserted(() -> {
155+
assertEquals(3, receivedMessage.size());
156+
assertSame(someReleaseMessage, receivedMessage.get(0));
157+
assertSame(anotherReleaseMessage, receivedMessage.get(1));
158+
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
159+
});
160+
161+
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(anotherId)).thenReturn(
162+
Lists.newArrayList(yetAnotherReleaseMessage));
163+
164+
await().untilAsserted(() -> {
165+
assertEquals(4, receivedMessage.size());
166+
assertSame(someReleaseMessage, receivedMessage.get(0));
167+
assertSame(anotherReleaseMessage, receivedMessage.get(1));
168+
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
169+
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
170+
});
171+
172+
when(releaseMessageRepository.findAllById(Sets.newHashSet(anotherMissingId, someRolledBackId)))
173+
.thenReturn(Lists.newArrayList(anotherMissingReleaseMessage));
174+
175+
await().untilAsserted(() -> {
176+
assertEquals(5, receivedMessage.size());
177+
assertSame(someReleaseMessage, receivedMessage.get(0));
178+
assertSame(anotherReleaseMessage, receivedMessage.get(1));
179+
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
180+
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
181+
assertSame(anotherMissingReleaseMessage, receivedMessage.get(4));
182+
});
95183
}
96184

97185
private ReleaseMessage assembleReleaseMessage(long id, String message) {

0 commit comments

Comments
 (0)