Skip to content

Commit 990a8d6

Browse files
committed
AMQ-9726 - Fix FilePendingMessageCursor clear() method (#1452)
This fixes the clear() method so that when clearing the memory map it will decrement memory usage, and when clearing the disk list it will destroy and reset the list for future writes. (cherry picked from commit b1e8441)
1 parent cdf8e40 commit 990a8d6

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,7 @@ public synchronized void release() {
162162
@Override
163163
public synchronized void destroy() throws Exception {
164164
stop();
165-
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
166-
MessageReference node = i.next();
165+
for (MessageReference node : memoryList) {
167166
node.decrementReferenceCount();
168167
}
169168
memoryList.clear();
@@ -365,11 +364,19 @@ public synchronized long messageSize() {
365364
*/
366365
@Override
367366
public synchronized void clear() {
367+
// AMQ-9726 - Iterate over all nodes to decrement the ref count
368+
// to decrement the memory usage tracker
369+
for (MessageReference node : memoryList) {
370+
node.decrementReferenceCount();
371+
}
368372
memoryList.clear();
369373
if (!isDiskListEmpty()) {
370374
try {
371-
getDiskList().destroy();
372-
} catch (IOException e) {
375+
// AMQ-9726 - This method will destroy the list and
376+
// set the reference to null so it will be reset
377+
// for future writes
378+
destroyDiskList();
379+
} catch (Exception e) {
373380
throw new RuntimeException(e);
374381
}
375382
}

activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import static org.junit.Assert.assertEquals;
3131
import static org.junit.Assert.assertFalse;
32+
import static org.junit.Assert.assertTrue;
33+
import static org.junit.Assert.fail;
3234

3335
/**
3436
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -94,5 +96,86 @@ public void testAddRemoveAddIndexSize() throws Exception {
9496
assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
9597
}
9698

99+
// Test for AMQ-9726
100+
@Test
101+
public void testClearCursor() throws Exception {
102+
brokerService = new BrokerService();
103+
brokerService.setUseJmx(false);
104+
SystemUsage usage = brokerService.getSystemUsage();
105+
usage.getMemoryUsage().setLimit(1024*150);
106+
Destination dest = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
107+
dest.setMemoryUsage(usage.getMemoryUsage());
108+
brokerService.start();
109+
110+
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
111+
underTest.setSystemUsage(usage);
112+
113+
// Add 10 messages to the cursor in memory
114+
addTestMessages(dest);
115+
116+
// Verify memory usage was increased and cache is enabled
117+
assertTrue(dest.getMemoryUsage().getUsage() > 0);
118+
assertEquals(10, underTest.size());
119+
assertTrue(underTest.isCacheEnabled());
120+
assertEquals(0, dest.getTempUsage().getUsage());
121+
122+
// Clear, this will verify memory usage is correctly decremented
123+
// and the memory map is cleared as well. Memory was previously
124+
// incorrectly not being cleared.
125+
underTest.clear();
126+
assertEquals(0, underTest.size());
127+
assertEquals(0, dest.getMemoryUsage().getUsage());
128+
129+
// Now test the disk cursor
130+
// set the memory usage limit very small so messages will go to
131+
// the disk list and not memory and send 10 more messages
132+
usage.getMemoryUsage().setLimit(1);
133+
addTestMessages(dest);
134+
135+
// confirm the cache is false and the memory is 0 because
136+
// the messages exist on disk and not in the memory map
137+
// also very temp usage is greater than 0 now
138+
assertFalse(underTest.isCacheEnabled());
139+
assertEquals(0, dest.getMemoryUsage().getUsage());
140+
assertTrue(dest.getTempUsage().getUsage() > 0);
141+
assertEquals(10, underTest.size());
142+
143+
// Test clearing the disk list shows a size of 0
144+
underTest.clear();
145+
assertEquals(0, underTest.size());
146+
147+
// Send 10 more messages to verify that we can send again
148+
// to the disk list after clear. Previously clear did not
149+
// correctly destroy/reset the disk cursor so an exception
150+
// was thrown when adding messages again after calling clear()
151+
addTestMessages(dest);
152+
assertFalse(underTest.isCacheEnabled());
153+
assertEquals(0, dest.getMemoryUsage().getUsage());
154+
assertTrue(dest.getTempUsage().getUsage() > 0);
155+
assertEquals(10, underTest.size());
156+
157+
// one final clear() and reset limit to make sure we can send to
158+
// memory again
159+
underTest.clear();
160+
usage.getMemoryUsage().setLimit(1024*150);
161+
assertEquals(0, underTest.size());
162+
assertEquals(0, dest.getMemoryUsage().getUsage());
163+
164+
// Verify memory usage was increased and cache is enabled
165+
addTestMessages(dest);
166+
assertTrue(dest.getMemoryUsage().getUsage() > 0);
167+
assertEquals(10, underTest.size());
168+
assertTrue(underTest.isCacheEnabled());
169+
}
170+
171+
private void addTestMessages(Destination dest) throws Exception {
172+
for (int i = 0; i< 10; i++) {
173+
ActiveMQMessage mqMessage = new ActiveMQMessage();
174+
mqMessage.setMessageId(new MessageId("1:2:3:" + i));
175+
mqMessage.setMemoryUsage(dest.getMemoryUsage());
176+
mqMessage.setRegionDestination(dest);
177+
underTest.addMessageLast(new IndirectMessageReference(mqMessage));
178+
}
179+
}
97180

98181
}

0 commit comments

Comments
 (0)