Showing posts with label websockets. Show all posts
Showing posts with label websockets. Show all posts

Saturday, November 30, 2013

Java WebSockets (JSR-356) on Jetty 9.1

Jetty 9.1 is finally released, bringing Java WebSockets (JSR-356) to non-EE environments. It's awesome news and today's post will be about using this great new API along with Spring Framework.

JSR-356 defines concise, annotation-based model to allow modern Java web applications easily create bidirectional communication channels using WebSockets API. It covers not only server-side, but client-side as well, making this API really simple to use everywhere.

Let's get started! Our goal would be to build a WebSockets server which accepts messages from the clients and broadcasts them to all other clients currently connected. To begin with, let's define the message format, which server and client will be exchanging, as this simple Message class. We can limit ourselves to something like a String, but I would like to introduce to you the power of another new API - Java API for JSON Processing (JSR-353).

package com.example.services;

public class Message {
    private String username;
    private String message;
 
    public Message() {
    }
 
    public Message( final String username, final String message ) {
        this.username = username;
        this.message = message;
    }
 
    public String getMessage() {
        return message;
    }
 
    public String getUsername() {
        return username;
    }

    public void setMessage( final String message ) {
        this.message = message;
    }
 
    public void setUsername( final String username ) {
        this.username = username;
    }
}

To separate the declarations related to the server and the client, JSR-356 defines two basic annotations: @ServerEndpoint and @ClientEndpoit respectively. Our client endpoint, let's call it BroadcastClientEndpoint, will simply listen for messages server sends:

package com.example.services;

import java.io.IOException;
import java.util.logging.Logger;

import javax.websocket.ClientEndpoint;
import javax.websocket.EncodeException;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;

@ClientEndpoint
public class BroadcastClientEndpoint {
    private static final Logger log = Logger.getLogger( 
        BroadcastClientEndpoint.class.getName() );
 
    @OnOpen
    public void onOpen( final Session session ) throws IOException, EncodeException  {
        session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );
    }

    @OnMessage
    public void onMessage( final Message message ) {
        log.info( String.format( "Received message '%s' from '%s'",
            message.getMessage(), message.getUsername() ) );
    }
}

That's literally it! Very clean, self-explanatory piece of code: @OnOpen is being called when client got connected to the server and @OnMessage is being called every time server sends a message to the client. Yes, it's very simple but there is a caveat: JSR-356 implementation can handle any simple objects but not the complex ones like Message is. To manage that, JSR-356 introduces concept of encoders and decoders.

We all love JSON, so why don't we define our own JSON encoder and decoder? It's an easy task which Java API for JSON Processing (JSR-353) can handle for us. To create an encoder, you only need to implement Encoder.Text< Message > and basically serialize your object to some string, in our case to JSON string, using JsonObjectBuilder.

package com.example.services;

import javax.json.Json;
import javax.json.JsonReaderFactory;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

public class Message {
    public static class MessageEncoder implements Encoder.Text< Message > {
        @Override
        public void init( final EndpointConfig config ) {
        }
  
        @Override
        public String encode( final Message message ) throws EncodeException {
            return Json.createObjectBuilder()
                .add( "username", message.getUsername() )
                .add( "message", message.getMessage() )
                .build()
                .toString();
        }
  
        @Override
        public void destroy() {
        }
    }
}

For decoder part, everything looks very similar, we have to implement Decoder.Text< Message > and deserialize our object from string, this time using JsonReader.

package com.example.services;

import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonReaderFactory;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;

public class Message {
    public static class MessageDecoder implements Decoder.Text< Message > {
        private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );
  
        @Override
        public void init( final EndpointConfig config ) {
        }
  
        @Override
        public Message decode( final String str ) throws DecodeException {
            final Message message = new Message();
   
            try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {
                final JsonObject json = reader.readObject();
                message.setUsername( json.getString( "username" ) );
                message.setMessage( json.getString( "message" ) );
            }
   
            return message;
        }
  
        @Override
        public boolean willDecode( final String str ) {
            return true;
        }
  
        @Override
        public void destroy() {
        }
    }
}

And as a final step, we need to tell the client (and the server, they share same decoders and encoders) that we have encoder and decoder for our messages. The easiest thing to do that is just by declaring them as part of @ServerEndpoint and @ClientEndpoit annotations.


import com.example.services.Message.MessageDecoder;
import com.example.services.Message.MessageEncoder;

@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )
public class BroadcastClientEndpoint {
}

To make client's example complete, we need some way to connect to the server using BroadcastClientEndpoint and basically exchange messages. The ClientStarter class finalizes the picture:

package com.example.ws;

import java.net.URI;
import java.util.UUID;

import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.websocket.jsr356.ClientContainer;

import com.example.services.BroadcastClientEndpoint;
import com.example.services.Message;

public class ClientStarter {
    public static void main( final String[] args ) throws Exception {
        final String client = UUID.randomUUID().toString().substring( 0, 8 );
  
        final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    
        final String uri = "ws://localhost:8080/broadcast";  
  
        try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {
            for( int i = 1; i <= 10; ++i ) {
                session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );
                Thread.sleep( 1000 );
            }
        }
  
        // Application doesn't exit if container's threads are still running
        ( ( ClientContainer )container ).stop();
    }
}

Just couple of comments what this code does: we are connecting to WebSockets endpoint at ws://localhost:8080/broadcast, randomly picking some client name (from UUID) and generating 10 messages, every with 1 second delay (just to be sure we have time to receive them all back).

Server part doesn't look very different and at this point could be understood without any additional comments (except may be the fact that server just broadcasts every message it receives to all connected clients). Important to mention here: new instance of the server endpoint is created every time new client connects to it (that's why peers collection is static), it's a default behavior and could be easily changed.

package com.example.services;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import javax.websocket.EncodeException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import com.example.services.Message.MessageDecoder;
import com.example.services.Message.MessageEncoder;

@ServerEndpoint( 
    value = "/broadcast", 
    encoders = { MessageEncoder.class }, 
    decoders = { MessageDecoder.class } 
) 
public class BroadcastServerEndpoint {
    private static final Set< Session > sessions = 
        Collections.synchronizedSet( new HashSet< Session >() ); 
   
    @OnOpen
    public void onOpen( final Session session ) {
        sessions.add( session );
    }

    @OnClose
    public void onClose( final Session session ) {
        sessions.remove( session );
    }

    @OnMessage
    public void onMessage( final Message message, final Session client ) 
            throws IOException, EncodeException {
        for( final Session session: sessions ) {
            session.getBasicRemote().sendObject( message );
        }
    }
}

In order this endpoint to be available for connection, we should start the WebSockets container and register this endpoint inside it. As always, Jetty 9.1 is runnable in embedded mode effortlessly:

package com.example.ws;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class ServerStarter  {
    public static void main( String[] args ) throws Exception {
        Server server = new Server( 8080 );
        
        // Create the 'root' Spring application context
        final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );
        final ServletContextHandler context = new ServletContextHandler();

        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/*" );
        context.addEventListener( new ContextLoaderListener() );   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
   
        server.setHandler( context );
        WebSocketServerContainerInitializer.configureContext( context );        
        
        server.start();
        server.join(); 
    }
}

The most important part of the snippet above is WebSocketServerContainerInitializer.configureContext: it's actually creates the instance of WebSockets container. Because we haven't added any endpoints yet, the container basically sits here and does nothing. Spring Framework and AppConfig configuration class will do this last wiring for us.

package com.example.config;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;

import org.eclipse.jetty.websocket.jsr356.server.AnnotatedServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.WebApplicationContext;

import com.example.services.BroadcastServerEndpoint;

@Configuration
public class AppConfig  {
    @Inject private WebApplicationContext context;
    private ServerContainer container;
 
    public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {
        @Override
        public < T > T getEndpointInstance( Class< T > endpointClass ) 
                throws InstantiationException {
            return context.getAutowireCapableBeanFactory().createBean( endpointClass );   
        }
    }
 
    @Bean
    public ServerEndpointConfig.Configurator configurator() {
        return new SpringServerEndpointConfigurator();
    }
 
    @PostConstruct
    public void init() throws DeploymentException {
        container = ( ServerContainer )context.getServletContext().
            getAttribute( javax.websocket.server.ServerContainer.class.getName() );
  
        container.addEndpoint( 
            new AnnotatedServerEndpointConfig( 
                BroadcastServerEndpoint.class, 
                BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  
            ) {
                @Override
                public Configurator getConfigurator() {
                    return configurator();
                }
            }
        );
    }  
}

As we mentioned earlier, by default container will create new instance of server endpoint every time new client connects, and it does so by calling constructor, in our case BroadcastServerEndpoint.class.newInstance(). It might be a desired behavior but because we are using Spring Framework and dependency injection, such new objects are basically unmanaged beans. Thanks to very well-thought (in my opinion) design of JSR-356, it's actually quite easy to provide your own way of creating endpoint instances by implementing ServerEndpointConfig.Configurator. The SpringServerEndpointConfigurator is an example of such implementation: it's creates new managed bean every time new endpoint instance is being asked (if you want single instance, you can create singleton of the endpoint as a bean in AppConfig and return it all the time).

The way we retrieve the WebSockets container is Jetty-specific: from the attribute of the context with name "javax.websocket.server.ServerContainer" (it probably might change in the future). Once container is there, we are just adding new (managed!) endpoint by providing our own ServerEndpointConfig (based on AnnotatedServerEndpointConfig which Jetty kindly provides already).

To build and run our server and clients, we need just do that:

mvn clean package
java -jar target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server
java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

As an example, by running server and couple of clients (I run 4 of them, '392f68ef', '8e3a869d', 'ca3a06d0', '6cb82119') you might see by the output in the console that each client receives all the messages from all other clients (including its own messages):

Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Hello!' from 'Client'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #1' from '392f68ef'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '8e3a869d'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from 'ca3a06d0'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '6cb82119'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '392f68ef'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #3' from '8e3a869d'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from 'ca3a06d0'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '6cb82119'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #3' from '392f68ef'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '8e3a869d'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from 'ca3a06d0'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '6cb82119'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '392f68ef'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '8e3a869d'
Nov 29, 2013 9:21:32 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from 'ca3a06d0'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '6cb82119'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '392f68ef'
Nov 29, 2013 9:21:33 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '8e3a869d'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '6cb82119'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #6' from '392f68ef'
Nov 29, 2013 9:21:34 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '8e3a869d'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '6cb82119'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from '392f68ef'
Nov 29, 2013 9:21:35 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '8e3a869d'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '6cb82119'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from '392f68ef'
Nov 29, 2013 9:21:36 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '8e3a869d'
Nov 29, 2013 9:21:37 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #9' from '392f68ef'
Nov 29, 2013 9:21:37 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '8e3a869d'
Nov 29, 2013 9:21:38 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #10' from '392f68ef'
2013-11-29 21:21:39.260:INFO:oejwc.WebSocketClient:main: Stopped org.eclipse.jetty.websocket.client.WebSocketClient@3af5f6dc

Awesome! I hope this introductory blog post shows how easy it became to use modern web communication protocols in Java, thanks to Java WebSockets (JSR-356), Java API for JSON Processing (JSR-353) and great projects such as Jetty 9.1!

As always, complete project is available on GitHub.

Wednesday, July 31, 2013

Easy Messaging with STOMP over WebSockets using Apollo

In my previous post I have covered couple of interesting use cases implementing STOMP messaging over Websockects using well-known message brokers, HornetQ and ActiveMQ. But the one I didn't cover is Apollo as in my own opinion its API is verbose and not expressive enough as for a Java developer. Nevertheless, the more time I spent playing with Apollo, more convinced I became that there is quite a potential out there. So this post is all about Apollo.

The problem we're trying to solve stays the same: simple publish/subscribe solution where JavaScript web client sends messages and listens for a specific topic. Whenever any message is received, client shows alert window (please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox).

Let's make our hands dirty by starting off with index.html (which imports awesome stomp.js JavaScript library):





The client part is not that different except topic name which is now /topic/test. The server side however differs a lot. Apollo is written is Scala and embraces asynchronous, non-blocking programming model. I think, it's a very good thing. What it brings though is a new paradigm to program against and it's also not necessarily a bad thing. The AppConfig class is the one which configures embedded Apollo broker:

package com.example.messaging;

import java.io.File;

import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.BrokerDTO;
import org.apache.activemq.apollo.dto.TopicDTO;
import org.apache.activemq.apollo.dto.VirtualHostDTO;
import org.apache.activemq.apollo.dto.WebAdminDTO;
import org.apache.activemq.apollo.stomp.dto.StompDTO;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean
    public Broker broker() throws Exception {
        final Broker broker = new Broker();
    
        // Configure STOMP over WebSockects connector
        final AcceptingConnectorDTO ws = new AcceptingConnectorDTO();
        ws.id = "ws";
        ws.bind = "ws://localhost:61614";  
        ws.protocols.add( new StompDTO() );

        // Create a topic with name 'test'
        final TopicDTO topic = new TopicDTO();
        topic.id = "test";
  
        // Create virtual host (based on localhost)
        final VirtualHostDTO host = new VirtualHostDTO();
        host.id = "localhost";  
        host.topics.add( topic );
        host.host_names.add( "localhost" );
        host.host_names.add( "127.0.0.1" );
        host.auto_create_destinations = false;
  
        // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/ 
        final WebAdminDTO webadmin = new WebAdminDTO();
        webadmin.bind = "http://localhost:61680";

        // Create JMX instrumentation 
        final JmxDTO jmxService = new JmxDTO();
        jmxService.enabled = true;
  
        // Finally, glue all together inside broker configuration
        final BrokerDTO config = new BrokerDTO();
        config.connectors.add( ws );
        config.virtual_hosts.add( host );
        config.web_admins.add( webadmin );
        config.services.add( jmxService );
  
        broker.setConfig( config );
        broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) );
  
        broker.start( new Runnable() {   
            @Override
            public void run() {  
                System.out.println("The broker has been started started.");
            }
        } );
  
        return broker;
    }
}

I guess it becomes clear what I meant by verbose and not expressive enough but at least it's easy to follow. Firstly, we are creating Websockects connector at ws://localhost:61614 and asking it to support the STOMP protocol. Then we are creating a simple topic with name test (which we refer as /topic/test on client side). Next important step is to create a virtual host and to bind topics (and queues if any) to it. The host names list is very important as the destination resolution logic heavily relies on it. In the following step we are configuring web admin UI and JMX instrumentation which provides us with access to configuration, statistics and monitoring. To check it out, please open this URL in your web browser once Apollo broker is started. And finally, by applying configuration and starting the broker we are good to go! As you can see, asynchronous programming model leads to callbacks and anonymous functions (where are you, Java 8?).

Now, when configuration is done, it's time to look at start-up logic placed into Starter class (again, callbacks and anonymous functions are used to perform graceful shutdown logic):

package com.example.messaging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.apollo.broker.Broker;
import org.springframework.context.annotation.ConfigurableApplicationContext;

public class Starter  {
    public static void main( String[] args ) throws Exception {
        try( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) {
            final Broker broker = context.getBean( Broker.class );  
            System.out.println( "Press any key to terminate ..." );
            System.in.read();    
         
            final CountDownLatch latch = new CountDownLatch( 1 );
            broker.stop( new Runnable() { 
                @Override
                public void run() {  
                    System.out.println("The broker has been stopped.");
                    latch.countDown();
                }
            } );
         
            // Gracefully stop the broker
            if( !latch.await( 1, TimeUnit.SECONDS ) ) {
                System.out.println("The broker hasn't been stopped, exiting anyway ...");
            }
        }
    }
}

As with the previous examples, after running Starter class and opening index.html in the browser, we should see something like that:

Great, it works just fine! I am pretty sure that just rewriting the code in Scala, this Apollo API usage example will look much more compact and concise. In any case, I think Apollo message broker is definitely worth to consider if you are looking for prominent messaging architecture.

All sources are available on GitHub: Apollo example.

Saturday, June 29, 2013

Easy Messaging with STOMP over WebSockets using ActiveMQ and HornetQ

Messaging is an extremely powerful tool for building distributed software systems of different levels. Typically, at least in Java ecosystem, the client (front-end) never interacts with message broker (or exchange) directly but does it by invoking server-side (back-end) services. Or client may not even be aware that there's messaging solution in place.

With Websockets gaining more and more adoption, and wide support of the text-oriented protocols like STOMP (used to communicate with message broker or exchange) are going to make a difference. Today's post will try to explain how simple it is to expose two very popular JMS implementations, Apache ActiveMQ and JBoss HornetQ, to be available to web front-end (JavaScript) using STOMP over Websockets.

Before digging into the code, one might argue that it's not a good idea to do that. So what's the purpose? The answer really depends:

  • you are developing prototype / proof of concept and need easy way to integrate publish/subscribe or peer-to-peer messaging
  • you don't want / need to build sophisticated architecture and the simplest solution which works is just enough
The scalability, fail-over and a lot of other very important decisions are not taken into consideration here but definitely should be if you are developing robust and resilient architecture.

So let's get started. As always, it's better to start with problem we're trying to solve: we would like to develop simple publish/subscribe solution where web client written in JavaScript will be able to send messages and listen for a specific topic. Whenever any message has been received, client just shows simple alert window. Please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox.

For both our examples client's code remains the same and so let's start with that. The great starting point is STOMP Over WebSocket article which introduces the stomp.js module and here is our index.html:




Extremely simple code but few details are worth to explain. First, we are looking for Websockets endpoint at ws://localhost:61614/stomp. It's sufficient for local deployment but better to replace localhost with real IP address or host name. Secondly, once connected, client subscribes to the topic (only interested in messages with priority: 9) and publishes the message to this topic immediately after. From client prospective, we are done.

Let's move on to message broker and our first one in list is Apache ActiveMQ. To make the example simple, we will embed Apache ActiveMQ broker into simple Spring application without using configuration XML files. As source code is available on GitHub, I will skip the POM file snippet and just show the code:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

As we can see, the ActiveMQ broker is configured with ws://localhost:61614 connector which assumes using STOMP protocol. Also, we are creating JMS topic with name jms.topic.test and enabling JMX management instrumentation. And to run it, simple Starter class:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Now, having it up and running, let's open index.html file in browser, we should see something like that:

Simple! For curious readers, ActiveMQ uses Jetty 7.6.7.v20120910 for Websockets support and won't work with latest Jetty distributions.

Moving on, with respect to HornetQ the implementations looks a bit different though not very complicated as well. As Starter class remains the same, the only change is the configuration:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

The complete source code is on GitHub. After running Starter class and openning index.html in browser, we should see very similar results:

HornetQ configuration looks a bit more verbose, however there are no additional dependencies involved except brilliant Netty framework.

For my own curiosity, I replaced the ActiveMQ broker with Apollo implementation. Though I succeeded with making it works as expected, I found the API to be very cumbersome, at least in current version 1.6, so I haven't covered it in this post.

All sources are available on GitHub: Apache ActiveMQ example and JBoss HornetQ example

Monday, May 27, 2013

Real-time charts with Play Framework and Scala: extreme productivity on JVM for web

Being a hardcore back-end developer, whenever I am thinking about building web application with some UI on JVM platform, I feel scared. And there are reasons for that: having experience with JSF, Liferay, Grails, ... I don't want to go this road anymore. But if a need comes, is there a choice, really? I found one which I think is awesome: Play Framework.

Built on top of JVM, Play Framework allows to create web applications using Java or Scala with literally no efforts. The valuable and distinguishing differences it provides: static compilation (even for page templates), easy to start with, and concise (more about it here).

To demonstrate how amazing Play Framework is, I would like to share my experience with developing simple web application. Let's assume we have couple of hosts and we would like to watch CPU usage on each one in real-time (on a chart). When one hears "real-time", it may mean different things but in context of our application it means: using WebSockets to push data from server to client. Though Play Framework supports pure Java API, I will use some Scala instead as it makes code very compact and clear.

Let's get started! After downloading Play Framework (the latest version on the moment of writing was 2.1.1), let's create our app by typing

play new play-websockets-example
and selecting Scala as a primary language. No wonders here: it's a pretty standard way nowadays, right?

Having our application ready, next step would be to create some starting web page. Play Framework uses own type safe template engine based on Scala, it has a couple of extremely simple rules and is very easy to get started with. Here is an example of views/dashboard.scala.html:

@(title: String, hosts: List[Host])

<!DOCTYPE html>
<html>
  <head>
    <title>@title</title>
    <link rel="stylesheet" media="screen" href="@routes.Assets.at("stylesheets/main.css")">
    <link rel="shortcut icon" type="image/png" href="@routes.Assets.at("images/favicon.png")">
    <script src="@routes.Assets.at("javascripts/jquery-1.9.0.min.js")" type="text/javascript">
    <script src="@routes.Assets.at("javascripts/highcharts.js")" type="text/javascript">
  </head>
    
  <body>
    <div id="hosts">
      <ul class="hosts">
        @hosts.map { host =>
        <li>               
          <a href="#" onclick="javascript:show( '@host.id' )"><b>@host.name</b></a>
        </li>
        }
      </ul>
    </div>
  
    <div id="content">
    </div>
  </body>
</html>

<script type="text/javascript">
function show( hostid ) {
  $("#content").load( "/host/" + hostid,
    function( response, status, xhr ) {
      if (status == "error") {
        $("#content").html( "Sorry but there was an error:" + xhr.status + " " + xhr.statusText);
      }
    }
  )
}
</script>

Aside from coupe of interesting constructs (which are very well described here), it looks pretty like regular HTML with a bit of JavaScript. The result of this web page is a simple list of hosts in the browser. Whenever user clicks on a particular host, another view will be fetched from the server (using old buddy AJAX) and displayed on right side from the host. Here is the second (and the last) template, views/host.scala.html:

@(host: Host)( implicit request: RequestHeader )

<div id="content">
  <div id="chart">
  <script type="text/javascript">
    var charts = []   
    charts[ '@host.id' ] = new Highcharts.Chart({                 
      chart: {
        renderTo: 'chart',
        defaultSeriesType: 'spline'            
      },           
      xAxis: {
        type: 'datetime'
      },   
      series: [{
        name: "CPU",
        data: []
      }]
    }); 
  </script>
</div>

<script type="text/javascript">
var socket = new WebSocket("@routes.Application.stats( host.id ).webSocketURL()")
socket.onmessage = function( event ) { 
  var datapoint = jQuery.parseJSON( event.data );
  var chart = charts[ '@host.id' ]
  
  chart.series[ 0 ].addPoint({
    x: datapoint.cpu.timestamp,
    y: datapoint.cpu.load
  }, true, chart.series[ 0 ].data.length >= 50 );
}
</script>

It's looks rather as a fragment, not a complete HTML page, which has only a chart and opens the WebSockets connection with a listener. With an enormous help of Highcharts and jQuery, JavaScript programming hasn't ever been so easy for back-end developers as it's now. At this moment, the UI part is completely done. Let's move on to back-end side.

Firstly, let's define the routing table which includes only three URLs and by default is located at conf/routes:

GET     /                           controllers.Application.index
GET     /host/:id                   controllers.Application.host( id: String )
GET     /stats/:id                  controllers.Application.stats( id: String )

Having views and routes defined, it's time to fill up the last and most interesting part, the controllers which glue all parts together (actually, only one controller, controllers/Application.scala). Here is a snippet which maps index action to the view templated by views/dashboard.scala.html, it's as easy as that:

def index = Action {
  Ok( views.html.dashboard( "Dashboard", Hosts.hosts() ) )
}

The interpretation of this action may sound like that: return successful response code and render template views/dashboard.scala.html with two parameters, title and hosts, as response body. The action to handle /host/:id looks much the same:

def host( id: String ) = Action { implicit request =>
  Hosts.hosts.find( _.id == id ) match {
    case Some( host ) => Ok( views.html.host( host ) )
    case None => NoContent
  }    
}

And here is a Hosts object defined in models/Hosts.scala. For simplicity, the list of hosts is hard-coded:

package models

case class Host( id: String, name: String )

object Hosts {  
  def hosts(): List[ Host ] = {
    return List( new Host( "h1", "Host 1" ), new Host( "h2", "Host 2" ) )
  } 
}

The boring part is over, let's move on to the last but not least implementation: server push of host's CPU statistics using WebSockets. As you can see, the /stats/:id URL is already mapped to controller action so let's take a look on its implementation:

def stats( id: String ) = WebSocket.async[JsValue] { request =>
  Hosts.hosts.find( _.id == id ) match {
    case Some( host ) => Statistics.attach( host )
    case None => {
      val enumerator = Enumerator
        .generateM[JsValue]( Promise.timeout( None, 1.second ) )
        .andThen( Enumerator.eof )
      Promise.pure( ( Iteratee.ignore[JsValue], enumerator ) )
    }
  }
}

Not too much code here but in case you are curious about WebSockets in Play Framework please follow this link. This couple of lines may look a bit weird at first but once you read the documentation and understand basic design principles behind Play Framework, it will look much more familiar and friendly. The Statistics object is the one who does the real job, let's take a look on the code:

package models

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.actor.ActorRef
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import play.api.Play.current
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Iteratee
import play.api.libs.json.JsValue

case class Refresh()
case class Connect( host: Host )
case class Connected( enumerator: Enumerator[ JsValue ] )

object Statistics {
  implicit val timeout = Timeout( 5 second )
  var actors: Map[ String, ActorRef ] = Map()
  
  def actor( id: String ) = actors.synchronized {
    actors.find( _._1 == id ).map( _._2 ) match {
      case Some( actor ) => actor      
      case None => {
        val actor = Akka.system.actorOf( Props( new StatisticsActor(id) ), name = s"host-$id" )   
        Akka.system.scheduler.schedule( 0.seconds, 3.second, actor, Refresh )
        actors += ( id -> actor )
        actor
      }
    }
  }
 
  def attach( host: Host ): Future[ ( Iteratee[ JsValue, _ ], Enumerator[ JsValue ] ) ] = {
    ( actor( host.id ) ? Connect( host ) ).map {      
      case Connected( enumerator ) => ( Iteratee.ignore[JsValue], enumerator )
    }
  }
}

As always, thanks to Scala conciseness, not too much code but a lot of things are going on. As we may have hundreds of hosts, it would be reasonable to dedicate to each host own worker (not a thread) or, more precisely, own actor. For that, we will use another amazing library called Akka. The code snippet above just creates an actor for the host or uses existing one from the registry of the already created actors. Please note that the implementation is quite simplified and leaves off important details. The thoughts in right direction would be using supervisors and other advanced concepts instead of synchronized block. Also worth mentioning that we would like to make our actor a scheduled task: we ask actor system to send the actor a message Refresh every 3 seconds. That means that the charts will be updated with new values every three seconds as well.

So, when actor for a host is created, we send him a message Connect notifying that a new connection is being established. When response message Connected is received, we return from the method and at this point connection over WebSockets is about to be established. Please note that we intentionally ignore any input from the client by using Iteratee.ignore[JsValue].

And here is the StatisticsActor implementation:

package models

import java.util.Date

import scala.util.Random

import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.json.JsNumber
import play.api.libs.json.JsObject
import play.api.libs.json.JsString
import play.api.libs.json.JsValue

class StatisticsActor( hostid: String ) extends Actor {
  val ( enumerator, channel ) = Concurrent.broadcast[JsValue]
  
  def receive = {
    case Connect( host ) => sender ! Connected( enumerator )       
    case Refresh => broadcast( new Date().getTime(), hostid )
  }
  
  def broadcast( timestamp: Long, id: String ) {
    val msg = JsObject( 
      Seq( 
        "id" -> JsString( id ),
        "cpu" -> JsObject( 
          Seq( 
            ( "timestamp" -> JsNumber( timestamp ) ), 
            ( "load" -> JsNumber( Random.nextInt( 100 ) ) ) 
          ) 
        )
      )
    )
     
    channel.push( msg )
  }
}

The CPU statistics is randomly generated and the actor just broadcasts it every 3 seconds as simple JSON object. On the client side, the JavaScript code parses this JSON and updates the chart. Here is how it looks like for two hosts, Host 1 and Host 2 in Mozilla Firefox:

To finish up, I am personally very excited with what I've done so far with Play Framework. It took just couple of hours to get started and another couple of hours to make things work as expected. The errors reporting and feedback cycle from running application are absolutely terrific, thanks a lot to Play Framework guys and the community around it. There are still a lot of things to learn for me but it worth doing it.

Please find the complete source code on GitHub.