Showing posts with label jms. Show all posts
Showing posts with label jms. Show all posts

Wednesday, July 31, 2013

Easy Messaging with STOMP over WebSockets using Apollo

In my previous post I have covered couple of interesting use cases implementing STOMP messaging over Websockects using well-known message brokers, HornetQ and ActiveMQ. But the one I didn't cover is Apollo as in my own opinion its API is verbose and not expressive enough as for a Java developer. Nevertheless, the more time I spent playing with Apollo, more convinced I became that there is quite a potential out there. So this post is all about Apollo.

The problem we're trying to solve stays the same: simple publish/subscribe solution where JavaScript web client sends messages and listens for a specific topic. Whenever any message is received, client shows alert window (please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox).

Let's make our hands dirty by starting off with index.html (which imports awesome stomp.js JavaScript library):





The client part is not that different except topic name which is now /topic/test. The server side however differs a lot. Apollo is written is Scala and embraces asynchronous, non-blocking programming model. I think, it's a very good thing. What it brings though is a new paradigm to program against and it's also not necessarily a bad thing. The AppConfig class is the one which configures embedded Apollo broker:

package com.example.messaging;

import java.io.File;

import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.BrokerDTO;
import org.apache.activemq.apollo.dto.TopicDTO;
import org.apache.activemq.apollo.dto.VirtualHostDTO;
import org.apache.activemq.apollo.dto.WebAdminDTO;
import org.apache.activemq.apollo.stomp.dto.StompDTO;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean
    public Broker broker() throws Exception {
        final Broker broker = new Broker();
    
        // Configure STOMP over WebSockects connector
        final AcceptingConnectorDTO ws = new AcceptingConnectorDTO();
        ws.id = "ws";
        ws.bind = "ws://localhost:61614";  
        ws.protocols.add( new StompDTO() );

        // Create a topic with name 'test'
        final TopicDTO topic = new TopicDTO();
        topic.id = "test";
  
        // Create virtual host (based on localhost)
        final VirtualHostDTO host = new VirtualHostDTO();
        host.id = "localhost";  
        host.topics.add( topic );
        host.host_names.add( "localhost" );
        host.host_names.add( "127.0.0.1" );
        host.auto_create_destinations = false;
  
        // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/ 
        final WebAdminDTO webadmin = new WebAdminDTO();
        webadmin.bind = "http://localhost:61680";

        // Create JMX instrumentation 
        final JmxDTO jmxService = new JmxDTO();
        jmxService.enabled = true;
  
        // Finally, glue all together inside broker configuration
        final BrokerDTO config = new BrokerDTO();
        config.connectors.add( ws );
        config.virtual_hosts.add( host );
        config.web_admins.add( webadmin );
        config.services.add( jmxService );
  
        broker.setConfig( config );
        broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) );
  
        broker.start( new Runnable() {   
            @Override
            public void run() {  
                System.out.println("The broker has been started started.");
            }
        } );
  
        return broker;
    }
}

I guess it becomes clear what I meant by verbose and not expressive enough but at least it's easy to follow. Firstly, we are creating Websockects connector at ws://localhost:61614 and asking it to support the STOMP protocol. Then we are creating a simple topic with name test (which we refer as /topic/test on client side). Next important step is to create a virtual host and to bind topics (and queues if any) to it. The host names list is very important as the destination resolution logic heavily relies on it. In the following step we are configuring web admin UI and JMX instrumentation which provides us with access to configuration, statistics and monitoring. To check it out, please open this URL in your web browser once Apollo broker is started. And finally, by applying configuration and starting the broker we are good to go! As you can see, asynchronous programming model leads to callbacks and anonymous functions (where are you, Java 8?).

Now, when configuration is done, it's time to look at start-up logic placed into Starter class (again, callbacks and anonymous functions are used to perform graceful shutdown logic):

package com.example.messaging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.apollo.broker.Broker;
import org.springframework.context.annotation.ConfigurableApplicationContext;

public class Starter  {
    public static void main( String[] args ) throws Exception {
        try( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) {
            final Broker broker = context.getBean( Broker.class );  
            System.out.println( "Press any key to terminate ..." );
            System.in.read();    
         
            final CountDownLatch latch = new CountDownLatch( 1 );
            broker.stop( new Runnable() { 
                @Override
                public void run() {  
                    System.out.println("The broker has been stopped.");
                    latch.countDown();
                }
            } );
         
            // Gracefully stop the broker
            if( !latch.await( 1, TimeUnit.SECONDS ) ) {
                System.out.println("The broker hasn't been stopped, exiting anyway ...");
            }
        }
    }
}

As with the previous examples, after running Starter class and opening index.html in the browser, we should see something like that:

Great, it works just fine! I am pretty sure that just rewriting the code in Scala, this Apollo API usage example will look much more compact and concise. In any case, I think Apollo message broker is definitely worth to consider if you are looking for prominent messaging architecture.

All sources are available on GitHub: Apollo example.

Saturday, June 29, 2013

Easy Messaging with STOMP over WebSockets using ActiveMQ and HornetQ

Messaging is an extremely powerful tool for building distributed software systems of different levels. Typically, at least in Java ecosystem, the client (front-end) never interacts with message broker (or exchange) directly but does it by invoking server-side (back-end) services. Or client may not even be aware that there's messaging solution in place.

With Websockets gaining more and more adoption, and wide support of the text-oriented protocols like STOMP (used to communicate with message broker or exchange) are going to make a difference. Today's post will try to explain how simple it is to expose two very popular JMS implementations, Apache ActiveMQ and JBoss HornetQ, to be available to web front-end (JavaScript) using STOMP over Websockets.

Before digging into the code, one might argue that it's not a good idea to do that. So what's the purpose? The answer really depends:

  • you are developing prototype / proof of concept and need easy way to integrate publish/subscribe or peer-to-peer messaging
  • you don't want / need to build sophisticated architecture and the simplest solution which works is just enough
The scalability, fail-over and a lot of other very important decisions are not taken into consideration here but definitely should be if you are developing robust and resilient architecture.

So let's get started. As always, it's better to start with problem we're trying to solve: we would like to develop simple publish/subscribe solution where web client written in JavaScript will be able to send messages and listen for a specific topic. Whenever any message has been received, client just shows simple alert window. Please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox.

For both our examples client's code remains the same and so let's start with that. The great starting point is STOMP Over WebSocket article which introduces the stomp.js module and here is our index.html:




Extremely simple code but few details are worth to explain. First, we are looking for Websockets endpoint at ws://localhost:61614/stomp. It's sufficient for local deployment but better to replace localhost with real IP address or host name. Secondly, once connected, client subscribes to the topic (only interested in messages with priority: 9) and publishes the message to this topic immediately after. From client prospective, we are done.

Let's move on to message broker and our first one in list is Apache ActiveMQ. To make the example simple, we will embed Apache ActiveMQ broker into simple Spring application without using configuration XML files. As source code is available on GitHub, I will skip the POM file snippet and just show the code:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

As we can see, the ActiveMQ broker is configured with ws://localhost:61614 connector which assumes using STOMP protocol. Also, we are creating JMS topic with name jms.topic.test and enabling JMX management instrumentation. And to run it, simple Starter class:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Now, having it up and running, let's open index.html file in browser, we should see something like that:

Simple! For curious readers, ActiveMQ uses Jetty 7.6.7.v20120910 for Websockets support and won't work with latest Jetty distributions.

Moving on, with respect to HornetQ the implementations looks a bit different though not very complicated as well. As Starter class remains the same, the only change is the configuration:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

The complete source code is on GitHub. After running Starter class and openning index.html in browser, we should see very similar results:

HornetQ configuration looks a bit more verbose, however there are no additional dependencies involved except brilliant Netty framework.

For my own curiosity, I replaced the ActiveMQ broker with Apollo implementation. Though I succeeded with making it works as expected, I found the API to be very cumbersome, at least in current version 1.6, so I haven't covered it in this post.

All sources are available on GitHub: Apache ActiveMQ example and JBoss HornetQ example