Skip to content

Commit 57264bf

Browse files
committed
https://issues.apache.org/jira/browse/AMQ-6206 - ensure properties are marshalled before dispatch to broker so that their values are reflected in the memory usage
1 parent 521c4fd commit 57264bf

2 files changed

Lines changed: 58 additions & 0 deletions

File tree

activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ protected void onStompSend(StompFrame command) throws IOException, JMSException
341341
}
342342

343343
message.onSend();
344+
message.beforeMarshall(null);
344345
sendToActiveMQ(message, createResponseHandler(command));
345346
}
346347

activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.net.Socket;
24+
import java.util.Arrays;
2425
import java.util.concurrent.TimeUnit;
2526

2627
import javax.jms.Connection;
@@ -543,4 +544,60 @@ public void testSubscribeWithNoId() throws Exception {
543544
frame = stompConnection.receiveFrame();
544545
assertTrue(frame.startsWith("ERROR"));
545546
}
547+
548+
@Test(timeout = 60000)
549+
public void testSizeAndBrokerUsage() throws Exception {
550+
final int MSG_COUNT = 10;
551+
final int numK = 4;
552+
553+
final byte[] bigPropContent = new byte[numK*1024];
554+
// fill so we don't fall foul to trimming in v<earlier than 1.2>
555+
Arrays.fill(bigPropContent, Byte.MAX_VALUE);
556+
final String bigProp = new String(bigPropContent);
557+
558+
String connectFrame = "STOMP\n" +
559+
"login:system\n" +
560+
"passcode:manager\n" +
561+
"accept-version:1.2\n" +
562+
"host:localhost\n" +
563+
"\n" + Stomp.NULL;
564+
565+
stompConnection.sendFrame(connectFrame);
566+
567+
String f = stompConnection.receiveFrame();
568+
LOG.debug("Broker sent: " + f);
569+
570+
assertTrue(f.startsWith("CONNECTED"));
571+
572+
long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
573+
574+
for(int i = 0; i < MSG_COUNT; ++i) {
575+
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
576+
"receipt:0\n" +
577+
"myXkProp:" + bigProp + "\n"+
578+
"\n" + "Hello World {" + i + "}" + Stomp.NULL;
579+
stompConnection.sendFrame(message);
580+
StompFrame repsonse = stompConnection.receive();
581+
LOG.info("response:" + repsonse);
582+
assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
583+
}
584+
585+
// verify usage accounts for our numK
586+
long usageEnd = brokerService.getSystemUsage().getMemoryUsage().getUsage();
587+
588+
long usageDiff = usageEnd - usageStart;
589+
LOG.info("usageDiff:" + usageDiff);
590+
assertTrue(usageDiff > MSG_COUNT * numK * 1024);
591+
592+
String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
593+
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
594+
stompConnection.sendFrame(subscribe);
595+
596+
for(int i = 0; i < MSG_COUNT; ++i) {
597+
StompFrame message = stompConnection.receive();
598+
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
599+
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
600+
}
601+
602+
}
546603
}

0 commit comments

Comments
 (0)