Showing posts with label distributed. Show all posts
Showing posts with label distributed. Show all posts

Sunday, February 23, 2014

Knowing how all your components work together: distributed tracing with Zipkin

In today's post we will try to cover very interesting and important topic: distributed system tracing. What it practically means is that we will try to trace the request from the point it was issued by the client to the point the response to this request was received. At first, it looks quite straightforward but in reality it may involve many calls to several other systems, databases, NoSQL stores, caches, you name it ...

In 2010 Google published a paper about Dapper, a large-scale distributed systems tracing infrastructure (very interesting reading by the way). Later on, Twitter built its own implementation based on Dapper paper, called Zipkin and that's the one we are going to look at.

We will build a simple JAX-RS 2.0 server using great Apache CXF library. For the client side, we will use JAX-RS 2.0 client API and by utilizing Zipkin we will trace all the interactions between the client and the server (as well as everything happening on server side). To make an example a bit more illustrative, we will pretend that server uses some kind of database to retrieve the data. Our code will be a mix of pure Java and a bit of Scala (the choice of Scala will be cleared up soon).

One additional dependency in order for Zipkin to work is Apache Zookeeper. It is required for coordination and should be started in advance. Luckily, it is very easy to do:

Now back to Zipkin. Zipkin is written in Scala. It is still in active development and the best way to start off with it is just by cloning its GitHub repository and build it from sources:

git clone https://github.com/twitter/zipkin.git

From architectural prospective, Zipkin consists of three main components:

  • collector: collects traces across the system
  • query: queries collected traces
  • web: provides web-based UI to show the traces

To run them, Zipkin guys provide useful scripts in the bin folder with the only requirement that JDK 1.7 should be installed:

  • bin/collector
  • bin/query
  • bin/web
Let's execute these scripts and ensure that every component has been started successfully, with no stack traces on the console (for curious readers, I was not able to make Zipkin work on Windows so I assume we are running it on Linux box). By default, Zipkin web UI is available on port 8080. The storage for traces is embedded SQLite engine. Though it works, the better storages (like awesome Redis) are available.

The preparation is over, let's do some code. We will start with JAX-RS 2.0 client part as it's very straightforward (ClientStarter.java):

package com.example.client;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.example.zipkin.Zipkin;
import com.example.zipkin.client.ZipkinRequestFilter;
import com.example.zipkin.client.ZipkinResponseFilter;

public class ClientStarter {
  public static void main( final String[] args ) throws Exception { 
    final Client client = ClientBuilder
      .newClient()
      .register( new ZipkinRequestFilter( "People", Zipkin.tracer() ), 1 )
      .register( new ZipkinResponseFilter( "People", Zipkin.tracer() ), 1 );        
                        
    final Response response = client
      .target( "http://localhost:8080/rest/api/people" )
      .request( MediaType.APPLICATION_JSON )
      .get();

    if( response.getStatus() == 200 ) {
      System.out.println( response.readEntity( String.class ) );
    }
        
    response.close();
    client.close();
        
    // Small delay to allow tracer to send the trace over the wire
    Thread.sleep( 1000 );
  }
}

Except a couple of imports and classes with Zipkin in it, everything should look simple. So what those ZipkinRequestFilter and ZipkinResponseFilter are for? Zipkin is awesome but it's not a magical tool. In order to trace any request in distributed system, there should be some context passed along with it. In REST/HTTP world, it's usually request/response headers. Let's take a look on ZipkinRequestFilter first (ZipkinRequestFilter.scala):

package com.example.zipkin.client

import javax.ws.rs.client.ClientRequestFilter
import javax.ws.rs.ext.Provider
import javax.ws.rs.client.ClientRequestContext
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.tracing.Tracer

@Provider
class ZipkinRequestFilter( val name: String, val tracer: Tracer ) extends ClientRequestFilter {
  def filter( requestContext: ClientRequestContext ): Unit = {      
    Trace.pushTracerAndSetNextId( tracer, true )
 
    requestContext.getHeaders().add( HttpTracing.Header.TraceId, Trace.id.traceId.toString )
    requestContext.getHeaders().add( HttpTracing.Header.SpanId, Trace.id.spanId.toString )
         
    Trace.id._parentId foreach { id => 
      requestContext.getHeaders().add( HttpTracing.Header.ParentSpanId, id.toString ) 
    }    

    Trace.id.sampled foreach { sampled => 
      requestContext.getHeaders().add( HttpTracing.Header.Sampled, sampled.toString ) 
    }

    requestContext.getHeaders().add( HttpTracing.Header.Flags, Trace.id.flags.toLong.toString )
             
    if( Trace.isActivelyTracing ) {
      Trace.recordRpcname( name,  requestContext.getMethod() )
      Trace.recordBinary( "http.uri", requestContext.getUri().toString()  )
      Trace.record( Annotation.ClientSend() )    
    }
  }
}

A bit of Zipkin internals will make this code superclear. The central part of Zipkin API is Trace class. Every time we would like to initiate tracing, we should have a Trace Id and the tracer to actually trace it. This single line generates new Trace Id and register the tracer (internally this data is held in thread local state).

Trace.pushTracerAndSetNextId( tracer, true )

Traces are hierarchical by nature, so do Trace Ids: every Trace Id could be a root or part of another trace. In our example, we know for sure that we are the first and as such the root of the trace. Later on the Trace Id is wrapped into HTTP headers and will be passed along the request (we will see on server side how it is being used). The last three lines associate the useful information with the trace: name of our API (People), HTTP method, URI and most importantly, that it's the client sending the request to the server.

Trace.recordRpcname( name,  requestContext.getMethod() )
Trace.recordBinary( "http.uri", requestContext.getUri().toString()  )
Trace.record( Annotation.ClientSend() ) 

The ZipkinResponseFilter does the reverse to ZipkinRequestFilter and extract the Trace Id from the request headers (ZipkinResponseFilter.scala):

package com.example.zipkin.client

import javax.ws.rs.client.ClientResponseFilter
import javax.ws.rs.client.ClientRequestContext
import javax.ws.rs.client.ClientResponseContext
import javax.ws.rs.ext.Provider
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.SpanId
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.tracing.Flags
import com.twitter.finagle.tracing.Tracer

@Provider
class ZipkinResponseFilter( val name: String, val tracer: Tracer ) extends ClientResponseFilter {  
  def filter( requestContext: ClientRequestContext, responseContext: ClientResponseContext ): Unit = {
    val spanId = SpanId.fromString( requestContext.getHeaders().getFirst( HttpTracing.Header.SpanId ).toString() )

    spanId foreach { sid =>
      val traceId = SpanId.fromString( requestContext.getHeaders().getFirst( HttpTracing.Header.TraceId ).toString() )
        
      val parentSpanId = requestContext.getHeaders().getFirst( HttpTracing.Header.ParentSpanId ) match {
        case s: String => SpanId.fromString( s.toString() )
        case _ => None
      }

      val sampled = requestContext.getHeaders().getFirst( HttpTracing.Header.Sampled ) match { 
        case s: String =>  s.toString.toBoolean
        case _ => true
      }
        
      val flags = Flags( requestContext.getHeaders().getFirst( HttpTracing.Header.Flags ).toString.toLong )        
      Trace.setId( TraceId( traceId, parentSpanId, sid, Option( sampled ), flags ) )
    }
      
    if( Trace.isActivelyTracing ) {
      Trace.record( Annotation.ClientRecv() )
    }
  }
}

Strictly speaking, in our example it's not necessary to extract the Trace Id from the request because both filters should be executed by the single thread. But the last line is very important: it marks the end of our trace by saying that client has received the response.

Trace.record( Annotation.ClientRecv() )

What's left is actually the tracer itself (Zipkin.scala):

package com.example.zipkin

import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.zipkin.thrift.ZipkinTracer
import com.twitter.finagle.tracing.Trace
import javax.ws.rs.ext.Provider

object Zipkin {
  lazy val tracer = ZipkinTracer.mk( host = "localhost", port = 9410, DefaultStatsReceiver, 1 )
}

If at this point you are confused what all those traces and spans mean please look through this documentation page, you will get the basic understanding of those concepts.

At this point, there is nothing left on the client side and we are good to move to the server side. Our JAX-RS 2.0 server will expose the single endpoint (PeopleRestService.java):

package com.example.server.rs;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Callable;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import com.example.model.Person;
import com.example.zipkin.Zipkin;

@Path( "/people" ) 
public class PeopleRestService {
  @Produces( { "application/json" } )
  @GET
  public Collection< Person > getPeople() {
    return Zipkin.invoke( "DB", "FIND ALL", new Callable< Collection< Person > >() {
      @Override
      public Collection call() throws Exception {
        return Arrays.asList( new Person( "Tom", "Bombdil" ) );
      }   
    } );   
  }
}

As we mentioned before, we will simulate the access to database and generate a child trace by using Zipkin.invoke wrapper (which looks very simple, Zipkin.scala):

package com.example.zipkin

import java.util.concurrent.Callable
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.zipkin.thrift.ZipkinTracer
import com.twitter.finagle.tracing.Annotation

object Zipkin {
  lazy val tracer = ZipkinTracer.mk( host = "localhost", port = 9410, DefaultStatsReceiver, 1 )
    
  def invoke[ R ]( service: String, method: String, callable: Callable[ R ] ): R = Trace.unwind {
    Trace.pushTracerAndSetNextId( tracer, false )      
      
    Trace.recordRpcname( service, method );
    Trace.record( new Annotation.ClientSend() );
          
    try {
      callable.call()
    } finally {
      Trace.record( new Annotation.ClientRecv() );
    }
  }   
}

As we can see, in this case the server itself becomes a client for some other service (database).

The last and most important part of the server is to intercept all HTTP requests, extract the Trace Id from them so it will be possible to associate more data with the trace (annotate the trace). In Apache CXF it's very easy to do by providing own invoker (ZipkinTracingInvoker.scala):

package com.example.zipkin.server

import org.apache.cxf.jaxrs.JAXRSInvoker
import com.twitter.finagle.tracing.TraceId
import org.apache.cxf.message.Exchange
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import org.apache.cxf.jaxrs.model.OperationResourceInfo
import org.apache.cxf.jaxrs.ext.MessageContextImpl
import com.twitter.finagle.tracing.SpanId
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.Flags
import scala.collection.JavaConversions._
import com.twitter.finagle.tracing.Tracer
import javax.inject.Inject

class ZipkinTracingInvoker extends JAXRSInvoker {
  @Inject val tracer: Tracer = null
  
  def trace[ R ]( exchange: Exchange )( block: => R ): R = {
    val context = new MessageContextImpl( exchange.getInMessage() )
    Trace.pushTracer( tracer )
        
    val id = Option( exchange.get( classOf[ OperationResourceInfo ] ) ) map { ori =>
      context.getHttpHeaders().getRequestHeader( HttpTracing.Header.SpanId ).toList match {
        case x :: xs => SpanId.fromString( x ) map { sid => 
          val traceId = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.TraceId ).toList match {
            case x :: xs => SpanId.fromString( x )
            case _ => None
          }
          
          val parentSpanId = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.ParentSpanId ).toList match {
            case x :: xs => SpanId.fromString( x )
            case _ => None
          }
  
          val sampled = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.Sampled ).toList match { 
            case x :: xs =>  x.toBoolean
            case _ => true
          }
                    
          val flags = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.Flags ).toList match {
            case x :: xs =>  Flags( x.toLong )
            case _ => Flags()
          }
         
          val id = TraceId( traceId, parentSpanId, sid, Option( sampled ), flags )                     
          Trace.setId( id )
        
          if( Trace.isActivelyTracing ) {
            Trace.recordRpcname( context.getHttpServletRequest().getProtocol(), ori.getHttpMethod() )
            Trace.record( Annotation.ServerRecv() )
          }
        
          id
        }           
          
        case _ => None
      }
    }
    
    val result = block
    
    if( Trace.isActivelyTracing ) {
      id map { id => Trace.record( new Annotation.ServerSend() ) }
    }
    
    result
  }
  
  @Override
  override def invoke( exchange: Exchange, parametersList: AnyRef ): AnyRef = {
    trace( exchange )( super.invoke( exchange, parametersList ) )     
  }
}

Basically, the only thing this code does is extracting Trace Id from request and associating it with the current thread. Also please notice that we associate additional data with the trace marking the server participation.

  Trace.recordRpcname( context.getHttpServletRequest().getProtocol(), ori.getHttpMethod() )
  Trace.record( Annotation.ServerRecv() )

To see the tracing in live, let's start our server (please notice that sbt should be installed), assuming all Zipkin components and Apache Zookeeper are already up and running:

sbt 'project server' 'run-main com.example.server.ServerStarter'
then the client:
sbt 'project client' 'run-main com.example.client.ClientStarter'
and finally open Zipkin web UI at http://localhost:8080. We should see something like that (depending how many times you have run the client):

Alternatively, we can build and run fat JARs using sbt-assembly plugin:

sbt assembly
java -jar server/target/zipkin-jaxrs-2.0-server-assembly-0.0.1-SNAPSHOT.jar 
java -jar client/target/zipkin-jaxrs-2.0-client-assembly-0.0.1-SNAPSHOT.jar 

If we click on any particular trace, the more detailed information will be shown, much resembling client <-> server <-> database chain.

Even more details are shown when we click on particular element in the tree.

Lastly, the bonus part is components / services dependency graph.

As we can see, all the data associated with the trace is here and follows hierarchical structure. The root and child traces are detected and shown, as well as timelines for client send/receive and server receive/send chains. Our example is quite naive and simple, but even like that it demonstrates how powerful and useful distributed system tracing is. Thanks to Zipkin guys.

The complete source code is available on GitHub.

Monday, October 28, 2013

Coordination and service discovery with Apache Zookeeper

Service-oriented design has proven to be a successful solution for a huge variety of different distributed systems. When used properly, it has a lot of benefits. But as number of services grows, it becomes more difficult to understand what is deployed and where. And because we are building reliable and highly-available systems, yet another question to ask: how many instances of each service are currently available?

In today's post I would like to introduce you to the world of Apache ZooKeeper - a highly reliable distributed coordination service. The number of features ZooKeeper provides is just astonishing so let us start with very simple problem to solve: we have a stateless JAX-RS service which we deploy across as many JVMs/hosts as we want. The clients of this service should be able to auto-discover all available instances and just pick one of them (or all) to perform a REST call.

Sounds like a very interesting challenge. There could be many ways to solve it but let me choose Apache ZooKeeper for that. The first step is to download Apache ZooKeeper (the current stable version at the moment of writing is 3.4.5) and unpack it. Next, we need to create a configuration file. The simple way to do that is by copying conf/zoo_sample.cfg to conf/zoo.cfg. To run, just execute:

Windows: bin/zkServer.cmd
Linux: bin/zkServer

Excellent, now Apache ZooKeeper is up and running, listening on port 2181 (default). Apache ZooKeeper itself worth of a book to explain its capabilities. But brief overview gives a very high-level picture, enough to get us started.

Apache ZooKeeper has a powerful Java API but it's quite low-level and not an easy one to use. That's why Netflix developed and open-sourced a great library called Curator to wrap native Apache ZooKeeper API into more convenient and easy to integrate framework (it's now an Apache incubator project).

Now, let's do some code! We are developing simple JAX-RS 2.0 service which returns list of people. As it will be stateless, we are able to run many instances within single host or multiple hosts, depending on system load for example. The awesome Apache CXF and Spring Framework will backed our implementation. Below is the code snippet for PeopleRestService:

package com.example.rs;

import java.util.Arrays;
import java.util.Collection;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import com.example.model.Person;

@Path( PeopleRestService.PEOPLE_PATH ) 
public class PeopleRestService {
    public static final String PEOPLE_PATH = "/people";
 
    @PostConstruct
    public void init() throws Exception {
    }
 
    @Produces( { MediaType.APPLICATION_JSON } )
    @GET
    public Collection< Person > getPeople( @QueryParam( "page") @DefaultValue( "1" ) final int page ) {
        return Arrays.asList(
            new Person( "Tom", "Bombadil" ),
            new Person( "Jim", "Tommyknockers" )
        );
    }
}

Very basic and naive implementation. Method init is empty by intention, it will be very helpful quite soon. Also, let us assume that every JAX-RS 2.0 service we're developing does support some notion of versioning, the class RestServiceDetails serves this purpose:

package com.example.config;

import org.codehaus.jackson.map.annotate.JsonRootName;

@JsonRootName( "serviceDetails" )
public class RestServiceDetails {
    private String version;
 
    public RestServiceDetails() {
    }
    
    public RestServiceDetails( final String version ) {
        this.version = version;
    }
    
    public void setVersion( final String version ) {
        this.version = version;
    }
    
    public String getVersion() {
        return version;
    }    
}

Our Spring configuration class AppConfig creates instance of JAX-RS 2.0 server with people REST service which will be hosted by Jetty container:

package com.example.config;

import java.util.Arrays;

import javax.ws.rs.ext.RuntimeDelegate;

import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import com.example.rs.JaxRsApiApplication;
import com.example.rs.PeopleRestService;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    public static final String SERVER_PORT = "server.port";
    public static final String SERVER_HOST = "server.host";
    public static final String CONTEXT_PATH = "rest";
 
    @Bean( destroyMethod = "shutdown" )
    public SpringBus cxf() {
        return new SpringBus();
    }
 
    @Bean @DependsOn( "cxf" )
    public Server jaxRsServer() {
        JAXRSServerFactoryBean factory = RuntimeDelegate.getInstance().createEndpoint( jaxRsApiApplication(), JAXRSServerFactoryBean.class );
        factory.setServiceBeans( Arrays.< Object >asList( peopleRestService() ) );
        factory.setAddress( factory.getAddress() );
        factory.setProviders( Arrays.< Object >asList( jsonProvider() ) );
        return factory.create();
    } 

    @Bean 
    public JaxRsApiApplication jaxRsApiApplication() {
        return new JaxRsApiApplication();
    }
 
    @Bean 
    public PeopleRestService peopleRestService() {
        return new PeopleRestService();
    }
 
    @Bean
    public JacksonJsonProvider jsonProvider() {
        return new JacksonJsonProvider();
    } 
}

And here is the class ServerStarter which runs embedded Jetty server. As we would like to host many such servers per host, the port shouldn't be hard-coded but rather provided as an argument:

package com.example;

import org.apache.cxf.transport.servlet.CXFServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class ServerStarter {
    public static void main( final String[] args ) throws Exception {
        if( args.length != 1 ) {
            System.out.println( "Please provide port number" );
            return;
        }
  
        final int port = Integer.valueOf( args[ 0 ] );
        final Server server = new Server( port );
   
        System.setProperty( AppConfig.SERVER_PORT, Integer.toString( port ) );
        System.setProperty( AppConfig.SERVER_HOST, "localhost" );
  
        // Register and map the dispatcher servlet
        final ServletHolder servletHolder = new ServletHolder( new CXFServlet() );
        final ServletContextHandler context = new ServletContextHandler();   
        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/" + AppConfig.CONTEXT_PATH + "/*" );  
        context.addEventListener( new ContextLoaderListener() );
   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
      
        server.setHandler( context );
        server.start();
        server.join(); 
    }
}

Nice, at this moment the boring part is over. But where Apache ZooKeeper and service discovery fit into this picture? Here is the answer: whenever new PeopleRestService service instance is deployed, it publishes (or registers) itself into Apache ZooKeeper registry, including the URL it's accessible at and service version it hosts. The clients can query Apache ZooKeeper in order to get the list of all available services and call them. The only thing services and their clients need to know is where Apache ZooKeeper is running. As I am deploying everything on my local machine, my instance is on localhost. Let's add this constant to the AppConfig class:

private static final String ZK_HOST = "localhost";

Every client maintains the persistent connection to the Apache ZooKeeper server. Whenever client dies, the connection goes down as well and Apache ZooKeeper can make a decision about availability of this particular client. To connect to Apache ZooKeeper, we have to create an instance of CuratorFramework class:

@Bean( initMethod = "start", destroyMethod = "close" )
public CuratorFramework curator() {
    return CuratorFrameworkFactory.newClient( ZK_HOST, new ExponentialBackoffRetry( 1000, 3 ) );
}

Next step is to create an instance of ServiceDiscovery class which will allow to publish service information for discovery into Apache ZooKeeper using just created CuratorFramework instance (we also would like to submit RestServiceDetails as additional metadata along with every service registration):

@Bean( initMethod = "start", destroyMethod = "close" )
public ServiceDiscovery< RestServiceDetails > discovery() {
    JsonInstanceSerializer< RestServiceDetails > serializer = 
        new JsonInstanceSerializer< RestServiceDetails >( RestServiceDetails.class );

    return ServiceDiscoveryBuilder.builder( RestServiceDetails.class )
        .client( curator() )
        .basePath( "services" )
        .serializer( serializer )
        .build();        
}

Internally, Apache ZooKeeper stores all its data as hierarchical namespace, much like standard file system does. The services path will be the base (root) path for all our services. Every service also needs to figure out which host and port it's running. We can do that by building URI specification which is included into JaxRsApiApplication class (the {port} and {scheme} will be resolved by Curator framework at the moment of service registration):

package com.example.rs;

import javax.inject.Inject;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

import org.springframework.core.env.Environment;

import com.example.config.AppConfig;
import com.netflix.curator.x.discovery.UriSpec;

@ApplicationPath( JaxRsApiApplication.APPLICATION_PATH )
public class JaxRsApiApplication extends Application {
    public static final String APPLICATION_PATH = "api";
 
    @Inject Environment environment;

    public UriSpec getUriSpec( final String servicePath ) {
        return new UriSpec( 
            String.format( "{scheme}://%s:{port}/%s/%s%s",
                environment.getProperty( AppConfig.SERVER_HOST ),
                AppConfig.CONTEXT_PATH,
                APPLICATION_PATH, 
                servicePath
            ) );   
    }
}

The last piece of the puzzle is the registration of PeopleRestService inside service discovery, and the init method comes into play here:

@Inject private JaxRsApiApplication application;
@Inject private ServiceDiscovery< RestServiceDetails > discovery; 
@Inject private Environment environment;

@PostConstruct
public void init() throws Exception {
    final ServiceInstance< RestServiceDetails > instance = 
        ServiceInstance.< RestServiceDetails >builder()
            .name( "people" )
            .payload( new RestServiceDetails( "1.0" ) )
            .port( environment.getProperty( AppConfig.SERVER_PORT, Integer.class ) )
            .uriSpec( application.getUriSpec( PEOPLE_PATH ) )
            .build();
  
    discovery.registerService( instance );
}

Here is what we have done:

  • created a service instance with name people (the complete name would be /services/people)
  • set the port to the actual value this instance is running
  • set the URI specification for this specific REST service endpoint
  • additionally, attached a payload (RestServiceDetails) with service version (though it's not used, it demonstrates the ability to pass more details)
Every new service instance we are running will publish itself under /services/people path in Apache ZooKeeper. To see everything in action, let us build and run couple of people service instances.

mvn clean package
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8080
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8081

From Apache ZooKeeper it might look like this (please notice that session UUIDs will be different):

Having two service instances up and running, let's try to consume them. From service client prospective, the first step is exactly the same: instances of CuratorFramework and ServiceDiscovery should be created (configuration class ClientConfig declares those beans), in the way we have done it above, no changes required. But instead of registering service, we will query the available ones:

package com.example.client;

import java.util.Collection;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.config.RestServiceDetails;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceInstance;

public class ClientStarter {
    public static void main( final String[] args ) throws Exception {
        try( final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( ClientConfig.class ) ) { 
            @SuppressWarnings("unchecked")
            final ServiceDiscovery< RestServiceDetails > discovery = 
                context.getBean( ServiceDiscovery.class );
            final Client client = ClientBuilder.newClient();
      
            final Collection< ServiceInstance< RestServiceDetails > > services = 
                discovery.queryForInstances( "people" );
            for( final ServiceInstance< RestServiceDetails > service: services ) {
                final String uri = service.buildUriSpec();
       
                final Response response = client
                    .target( uri )
                    .request( MediaType.APPLICATION_JSON )
                    .get();
       
                System.out.println( uri + ": " + response.readEntity( String.class ) );
                System.out.println( "API version: " + service.getPayload().getVersion() );
       
                response.close();
            }
        }
    }
}

Once service instances are retrieved, the REST call is being made (using awesome JAX-RS 2.0 client API) and additionally the service version is being asked (as payload contains instance of RestServiceDetails class). Let's build and run our client against two instances we have deployed previously:

mvn clean package
java -jar jax-rs-2.0-client\target\jax-rs-2.0-client-0.0.1-SNAPSHOT.one-jar.jar

The console output should show two calls to two different endpoints:

http://localhost:8081/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

http://localhost:8080/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

If we stop one or all instances, they will disappear from Apache ZooKeeper registry. The same applies if any instance crashes or becomes unresponsive.

Excellent! I guess we achieved our goal using such a great and powerful tool as Apache ZooKeeper. Thanks to its developers as well as to Curator guys for making it so easy to use Apache ZooKeeper in your applications. We have just scratched the surface of what is possible to accomplish by using Apache ZooKeeper, I strongly encourage everyone to explore its capabilities (distributed locks, caches, counters, queues, ...).

Worth to mention another great project build on top of Apache ZooKeeper from LinkedIn called Norbert. For Eclipse developers, the Eclipse plugin is also available.

All sources are available on GitHub.