Showing posts with label scala. Show all posts
Showing posts with label scala. Show all posts

Saturday, May 28, 2022

The lost art of learning ... (a new programming language)

I remember the time, 20 years ago, when as a junior software developer I was driven by constant, non-stoppable desire to learn new things: new features, techniques, frameworks, architectures, styles, practices, ... and surely, new programming languages. This curiosity, hunger to learn, drove me to consume as many books as possible and to hunt around the clock for the scanty online publications available at that time.

Year after year, the accumulated knowledge backed by invaluable experience led me to settle on proven and battle-tested (in production) choices, which have worked, work these days and I am sure will work in the future. For me, the foundation was set in Java (later Scala as well) and JVM platform in general. This is where my expertise lays in, but don't get me wrong, I still try to read a lot, balancing on the edge between books and enormous amount of the blog posts, articles, talks and podcast. How many programming languages I was able to master for the last, let say, 10 years? If we count the JVM ecosystem only, Scala and Groovy would make the list, but otherwise - none, zero, 0. The pace of learning on this front has slowed for me, significantly ...

Why is that? Did I get burned out? Nope (thanks God). Fed up by development? Nope, hopefully never. Opionated? Yes, certainly. Older? Definitely. Less curious? Not really. To be fair, there are so many things going on in the JVM ecosystem, so many projects are being developed by the community, it has become increasingly difficult to find the time for something else! And frankly, why bother if you could do Java for the next 20 years without any fear of it disappearing or becoming irrelevant.

JVM is amazing platform but its fundamental principles heavily impact certain design aspects of any programming language built on top of it. It is good and bad (everything is a trade-off), but with the time, you as a developer get used to it, questioning if anything else makes sense. As many others, I ended up in the bubble which has to be blown up. How? I decided to learn Rust, and go way beyond the basics.

But no worries, I do not intend to sell you Rust but rather share an alarming discovery: the lost art of learning a new programming language. It all started as usual, crunching through the blogs, people's and companies' experiences and finally getting the book, Programming Rust: Fast, Safe Systems Development (really good one but the official The Rust Programming Language is equally awesome). A few months (!) later, I was done reading, feeling ready for delivering cool, production-ready Rust applications, or so I thought ... The truth - I was far, very far from it ... and those are the things I did wrong.

  • Rust is concise (at least, to my taste), modern and powerful, but it is not a simple language. If some language aspects are not clear, do not skip over but try to understand them. If book is not doing sufficiently good job at it, look around till you get them, otherwise you risk to miss even more. I did not do that (overly confident that nothing could be harder than learning functional Scala) and at some point I was somewhat lost.

  • In the same vein, Rust has unique perspective on many things, including concurrency and memory management, and investing the time to understand what those are is a must (the Java/JVM background is not very helpful here). Essentially, you have to "unlearn" in order to grasp certain new concepts, this is not easy and confuses the brain.

  • Stay very focused and methodical, try to read at least few pages **every day**. Yes, we are all super busy, always distracted, have families and started to travel again, ... yada yada yada. Also, Programming Rust: Fast, Safe Systems Development is not a slim book: ~800 pages! Irregular, long breaks in reading were killing my progress.

  • Practice, practice and practice. If you have some pet projects in mind, this is great, if not - just come up with anything ad-hoc but try to use the things you have just read about. Learning the "theory" is great, but mastering the programming language is all about using it every day to solve problems. I screwed there epically by following "bed time story" style ...

  • Learn the coding practices and patterns by looking into the language's ecosystem, in this regard a curated list of Rust code and resources is super useful.

  • Subscribe to newsletters to stay up to date and be aware what is going on in the core development and in the community. In case of Rust, the This Week in Rust turned out to be invaluable.

  • Watch talks and presentations, luckily most of the conferences do publish all their videos online these days. I stumbled upon the official Rust YouTube channel and since then follow it closely.

Learning new programming language is rewarding experience, in particular when it happens to be Rust, a most beloved programming language for the last six years. But not only because of that, Rust is perfect fit for a system programming, something Java/JVM is inherently not suited very well (yes, it is better now with JEP 330: Launch Single-File Source-Code Programs, jbang and GraalVM native images but still far from being perfect for a task). From the other side, it is quite challenging when the programming language you are learning is not part of your day-to-day job.

As of this moment, I am not sure if Rust ever becomes my primary programming language but one thing I do know: I think every passionate software developer should strive to learn at least one new programming language every 1-2 years (we have so many good ones). It is amazing mental exercice and I used to follow this advice back in the days but lost the sight of it along the journey. I really should have not.

Happy learning and peace to everyone!

Tuesday, February 18, 2020

In praise of the thoughful design: how property-based testing helps me to be a better developer

The developer's testing toolbox is one of these things which rarely stays unchanged. For sure, some testing practices have proven to be more valuable than others but still, we are constantly looking for better, faster and more expressive ways to test our code. Property-based testing, largely unknown to Java community, is yet another gem crafted by Haskell folks and described in QuickCheck paper.

The power of this testing technique has been quickly realized by Scala community (where the ScalaCheck library was born) and many others but the Java ecosystem has lacked the interest into adopting property-based testing for quite some time. Luckily, since the jqwik appearance, the things are slowly changing for better.

For many, it is quite difficult to grasp what property-based testing is and how it could be exploited. The excellent presentation Property-based Testing for Better Code by Jessica Kerr and comprehensive An introduction to property-based testing, Property-based Testing Patterns series of articles are excellent sources to get you hooked, but in today's post we are going to try discovering the practical side of the property-based testing for typical Java developer using jqwik.

To start with, what the name property-based testing actually implies? The first thought of every Java developer would be it aims to test all getters and setters (hello 100% coverage)? Not really, although for some data structures it could be useful. Instead, we should identify the high-level characteristics, if you will, of the component, data structure, or even individual function and efficiently test them by formulating the hypothesis.

Our first example falls into category "There and back again": serialization and deserialization into JSON representation. The class under the test is User POJO, although trivial, please notice that it has one temporal property of type OffsetDateTime.

public class User {
    private String username;
    @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss[.SSS[SSS]]XXX", shape = Shape.STRING)
    private OffsetDateTime created;
    
    // ...
}

It is surprising to see how often manipulation with date/time properties are causing issues these days since everyone tries to use own representation. As you could spot, our contract is using ISO-8601 interchange format with optional milliseconds part. What we would like to make sure is that any valid instance of User could be serialized into JSON and desearialized back into Java object without loosing any date/time precision. As an exercise, let us try to express that in pseudo code first:

For any user
  Serialize user instance to JSON
  Deserialize user instance back from JSON
  Two user instances must be identical

Looks simple enough but here comes the surprising part: let us take a look at how this pseudo code projects into real test case using jqwik library. It gets as close to our pseudo code as it possibly could.

@Property
void serdes(@ForAll("users") User user) throws JsonProcessingException {
    final String json = serdes.serialize(user);

    assertThat(serdes.deserialize(json))
        .satisfies(other -> {
            assertThat(user.getUsername()).isEqualTo(other.getUsername());
            assertThat(user.getCreated().isEqual(other.getCreated())).isTrue();
        });
        
    Statistics.collect(user.getCreated().getOffset());
}

The test case reads very easy, mostly natural, but obviously, there is some background hidden behind jqwik's @Property and @ForAll annotations. Let us start from @ForAll and clear out where all these User instances are coming from. As you may guess, these instances must be generated, preferably in a randomized fashion.

For most of the built-in data types jqwik has a rich set of data providers (Arbitraries), but since we are dealing with application-specific class, we have to supply our own generation strategy. It should be able to emit User class instances with the wide range of usernames and the date/time instants for different set of timezones and offsets. Let us do a sneak peek at the provider implementation first and discuss it in details right after.

@Provide
Arbitrary<User> users() {
    final Arbitrary<String> usernames = Arbitraries.strings().alpha().ofMaxLength(64);
 
    final Arbitrary<OffsetDateTime> dates = Arbitraries
        .of(List.copyOf(ZoneId.getAvailableZoneIds()))
        .flatMap(zone -> Arbitraries
            .longs()
            .between(1266258398000L, 1897410427000L) // ~ +/- 10 years
            .unique()
            .map(epochMilli -> Instant.ofEpochMilli(epochMilli))
            .map(instant -> OffsetDateTime.from(instant.atZone(ZoneId.of(zone)))));

    return Combinators
        .combine(usernames, dates)
        .as((username, created) -> new User(username).created(created));

}

The source of usernames is easy: just random strings. The source of dates basically could be any date/time between 2010 and 2030 whereas the timezone part (thus the offset) is randomly picked from all available region-based zone identifiers. For example, below are some samples jqwik came up with.

{"username":"zrAazzaDZ","created":"2020-05-06T01:36:07.496496+03:00"}
{"username":"AZztZaZZWAaNaqagPLzZiz","created":"2023-03-20T00:48:22.737737+08:00"}
{"username":"aazGZZzaoAAEAGZUIzaaDEm","created":"2019-03-12T08:22:12.658658+04:00"}
{"username":"Ezw","created":"2011-10-28T08:07:33.542542Z"}
{"username":"AFaAzaOLAZOjsZqlaZZixZaZzyZzxrda","created":"2022-07-09T14:04:20.849849+02:00"}
{"username":"aaYeZzkhAzAazJ","created":"2016-07-22T22:20:25.162162+06:00"}
{"username":"BzkoNGzBcaWcrDaaazzCZAaaPd","created":"2020-08-12T22:23:56.902902+08:45"}
{"username":"MazNzaTZZAEhXoz","created":"2027-09-26T17:12:34.872872+11:00"}
{"username":"zqZzZYamO","created":"2023-01-10T03:16:41.879879-03:00"}
{"username":"GaaUazzldqGJZsqksRZuaNAqzANLAAlj","created":"2015-03-19T04:16:24.098098Z"}
...

By default, jqwik will run the test against 1000 different sets of parameter values (randomized User instances). The quite helpful Statistics container allows to collect whatever distribution insights you are curious about. Just in case, why not to collect the distribution by zone offsets?

    ...
    -04:00 (94) :  9.40 %
    -03:00 (76) :  7.60 %
    +02:00 (75) :  7.50 %
    -05:00 (74) :  7.40 %
    +01:00 (72) :  7.20 %
    +03:00 (69) :  6.90 %
    Z      (62) :  6.20 %
    -06:00 (54) :  5.40 %
    +11:00 (42) :  4.20 %
    -07:00 (39) :  3.90 %
    +08:00 (37) :  3.70 %
    +07:00 (34) :  3.40 %
    +10:00 (34) :  3.40 %
    +06:00 (26) :  2.60 %
    +12:00 (23) :  2.30 %
    +05:00 (23) :  2.30 %
    -08:00 (20) :  2.00 %
    ...    

Let us consider another example. Imagine at some point we decided to reimplement the equality for User class (which in Java means, overriding equals and hashCode) based on username property. With that, for any pair of User class instances the following invariants must hold true:

  • if two User instances have the same username, they are equal and must have same hash code
  • if two User instances have different usernames, they are not equal (but hash code may not necessarily be different)
It is the perfect fit for property-based testing and jqwik in particular makes such kind of tests trivial to write and maintain.

@Provide
Arbitrary<String> usernames() {
    return Arbitraries.strings().alpha().ofMaxLength(64);
}

@Property
void equals(@ForAll("usernames") String username, @ForAll("usernames") String other) {
    Assume.that(!username.equals(other));
        
    assertThat(new User(username))
        .isEqualTo(new User(username))
        .isNotEqualTo(new User(other))
        .extracting(User::hashCode)
        .isEqualTo(new User(username).hashCode());
}

The assumptions expressed through Assume allow to put additional constraints on the generated parameters since we introduce two sources of the usernames, it could happen that both of them emit the identical username at the same run so the test would fail.

The question you may be holding up to now is: what is the point? It is surely possible to test serialization / deserialization or equals/hashCode without embarking on property-based testing and using jqwik, so why even bother? Fair enough, but the answer to this question basically lies deeply in how we approach the design of our software systems.

By and large, property-based testing is heavily influenced by functional programming, not a first thing which comes into mind with respect to Java (at least, not yet), to say it mildly. The randomized generation of test data is not novel idea per se, however what property-based testing is encouraging you to do, at least in my opinion, is to think in more abstract terms, focus not on individual operations (equals, compare, add, sort, serialize, ...) but what kind of properties, characteristics, laws and/or invariants they come with to obey. It certainly feels like an alien technique, paradigm shift if you will, encourages to spend more time on designing the right thing. It does not mean that from now on all your tests must be property-based but I believe it certainly deserves the place in the front row of our testing toolboxes.

Please find the complete project sources available on Github.

Wednesday, April 26, 2017

When following REST(ful) principles might look impractical, GraphQL could come on the resque

I am certainly late with jumping on the trendy train, but today we are going to talk about GraphQL, a very interesting approach to build REST(ful) web services and APIs. In my opinion, it would be fair to restate that REST(ful) architecture is built on top of quite reasonable principles and constraints (although the debates over that are never ending in the industry).

So ... what is GraphQL? By and large, it is yet another kind of the query language. But what makes it interesting, it is designed to give the clients (f.e., the frontends) the ability to express their needs (f.e., to the backends) in terms of data they are expecting. Frankly speaking, GraphQL goes much further than that but this is the one of its most compelling features.

GraphQL is just a specification, without any particular requirements to the programming language or technology stack but, not surprisingly, it got the widespread acceptance in modern web development, both on client-side (Apollo, Relay) and server-side (the class of APIs often called BFFs these days). In today's post we are going to give GraphQL a ride, discuss where it could be a good fit and why you may consider adopting it. Although there quite a few options on the table, Sangria, terrific Scala-based implementation of GraphQL specification, would be the foundation we are going to build atop.

Essentially, our goal is to develop a simple user management web API. The data model is far from being complete but good enough to serve its purpose. Here is the User class.

case class User(id: Long, email: String, created: Option[LocalDateTime], 
  firstName: Option[String], lastName: Option[String], roles: Option[Seq[Role.Value]], 
    active: Option[Boolean], address: Option[Address])

The Role is a hardcoded enumeration:

object Role extends Enumeration {
  val USER, ADMIN = Value
}

While the Address is another small class in the data model:

case class Address(street: String, city: String, state: String, 
  zip: String, country: String)

In its core, GraphQL is strongly typed. That means, the application specific models should be somehow represented in GraphQL. To speak naturally, we need to define schema. In Sangria, schema definitions are pretty straightforward and consist of three main categories, borrowed from GraphQL specification: object types, query types and mutation types. All of these we are going to touch upon, but the object type definitions sounds like a logical point to start with.

val UserType = ObjectType(
  "User",
  "User Type",
  interfaces[Unit, User](),
  fields[Unit, User](
    Field("id", LongType, Some("User id"), tags = ProjectionName("id") :: Nil, 
      resolve = _.value.id),
    Field("email", StringType, Some("User email address"), 
      resolve = _.value.email),
    Field("created", OptionType(LocalDateTimeType), Some("User creation date"), 
      resolve = _.value.created),
    Field("address", OptionType(AddressType), Some("User address"), 
      resolve = _.value.address),
    Field("active", OptionType(BooleanType), Some("User is active or not"), 
      resolve = _.value.active),
    Field("firstName", OptionType(StringType), Some("User first name"), 
      resolve = _.value.firstName),
    Field("lastName", OptionType(StringType), Some("User last name"), 
      resolve = _.value.lastName),
    Field("roles", OptionType(ListType(RoleEnumType)), Some("User roles"), 
      resolve = _.value.roles)
  ))

In many respects, it is just direct mapping from the User to UserType. Sangria can easy you off from doing that by providing macros support so you may get the schema generated at compile time. The AddressType definition is very similar, let us skip over it and look on how to deal with enumeration, like Roles.

val RoleEnumType = EnumType(
  "Role",
  Some("List of roles"),
  List(
    EnumValue("USER", value = Role.USER, description = Some("User")),
    EnumValue("ADMIN", value = Role.ADMIN, description = Some("Administrator"))
  ))

Easy, simple and compact ... In traditional REST(ful) web services the metadata about the resources is not generally available out of the box. However, several complimentary specifications, like JSON Schema, could fill this gap with a bit of work.

Good, so types are there, but what are these queries and mutations? Query is a special type within GraphQL specification which basically describes how you would like to fetch the data and the shape of it. For example, there is often a need to get user details by its identifier, which could be expressed by following GraphQL query:

query {
  user(id: 1) {
    email
    firstName
    lastName
    address {
      country
    }
  }
}

You can literally read it as-is: lookup the user with identifier 1 and return only his email, first and last names, and address with the country only. Awesome, not only GraphQL queries are exceptionally powerful, but they are giving the control of what to return back to the interested parties. Priceless feature if you have to support a diversity of different clients without exploding the amount of API endpoints. Defining the query types in Sangria is also a no-brainer, for example:

val Query = ObjectType(
  "Query", fields[Repository, Unit](
    Field("user", OptionType(UserType), arguments = ID :: Nil, 
      resolve = (ctx) => ctx.ctx.findById(ctx.arg(ID))),
    Field("users", ListType(UserType), 
      resolve = Projector((ctx, f) => ctx.ctx.findAll(f.map(_.name))))
  ))

There are two queries in fact which the code snippet above describes. The one we have seen before, fetching user by identifier, and another one, fetching all users. Here is a quick example of latter:

query {
  users {
    id 
    email 
  }
}

Hopefully you would agree that no explanations needed, the intent is clear. Queries arguably are the strongest argument in favor of adopting GraphQL, the value proposition is really tremendous. With Sangria you do have access to the fields which clients want back, so the data store could be told to return only these subsets, using projections, selects, or similar concepts. To be closer to reality, our sample application stores data in MongoDB so we could ask it to return only fields the client is interested in.

def findAll(fields: Seq[String]): Future[Seq[User]] = collection.flatMap(
   _.find(document())
    .projection(
      fields
       .foldLeft(document())((doc, field) => doc.merge(field -> BSONInteger(1)))
    )
    .cursor[User]()
    .collect(Int.MaxValue, Cursor.FailOnError[Seq[User]]())
  )

If we get back to the typical REST(ful) web APIs, the approach most widely used these days to outline the shape of the desired response is to pass a query string parameter, for example /api/users?fields=email,firstName,lastName, .... However, from the implementation perspective, not many frameworks support such features natively, so everyone has to come up with their own way. Regarding the querying capabilities, in case you happen to be the user of terrific Apache CXF framework, you may benefit from its quite powerful search extension, which we have talked about some time ago.

If queries usually just fetch data, mutations are serving the purpose of the data modification. Syntactically they are very similar to queries but their interpretation is different. For example, here is one of the ways we could add new user to the application.

mutation {
  add(email: "[email protected]", firstName: "John", lastName: "Smith", roles: [ADMIN]) {
    id
    active
    email
    firstName
    lastName
    address {
      street
      country
    }
  }
}

In this mutation a new user John Smith with email [email protected] and ADMIN role assigned is going to be added to the system. As with queries, client is always in control which data shape it needs from server when mutation completes. Mutations could be think of as the calls for action and resemble a lot method invocations, for example the activation of the user may be done like that:

mutation {
  activate(id: 1) {
    active
  }
}

In Sangria, mutations are described exactly like queries, for example the ones we have looked at before have the following type definition:

val Mutation = ObjectType(
  "Mutation", fields[Repository, Unit](
    Field("activate", OptionType(UserType), 
      arguments = ID :: Nil,
      resolve = (ctx) => ctx.ctx.activateById(ctx.arg(ID))),
    Field("add", OptionType(UserType), 
      arguments = EmailArg :: FirstNameArg :: LastNameArg :: RolesArg :: Nil,
      resolve = (ctx) => ctx.ctx.addUser(ctx.arg(EmailArg), ctx.arg(FirstNameArg), 
        ctx.arg(LastNameArg), ctx.arg(RolesArg)))
  ))

With that, our GraphQL schema is complete:

val UserSchema = Schema(Query, Some(Mutation))

That's great, however ... what we can do with it? Just in time question, please welcome GraphQL server. As we remember, there is no attachment to particular technology or stack, but in the universe of web APIs you can think of GraphQL server as a single endpoint which is bound to POST HTTP verb. And, once we started to talk about HTTP and Scala, who could do better job than amazing Akka HTTP, luckily Sangria has a seamless integration with it.

val route: Route = path("users") {
  post {
    entity(as[String]) { document =>
      QueryParser.parse(document) match {
        case Success(queryAst) =>
          complete(Executor.execute(SchemaDefinition.UserSchema, queryAst, repository)
            .map(OK -> _)
            .recover {
              case error: QueryAnalysisError => BadRequest -> error.resolveError
              case error: ErrorWithResolver => InternalServerError -> error.resolveError
            })

        case Failure(error) => complete(BadRequest -> Error(error.getMessage))
      }
    }
  } ~ get {
    complete(SchemaRenderer.renderSchema(SchemaDefinition.UserSchema))
  }
}

You may notice that we also expose our schema under GET endpoint as well, what it is here for? Well, if you are familiar with Swagger which we have talked about a lot here, it is a very similar concept. The schema contains all the necessary pieces, enough for external tools to automatically discover the respective GraphQL queries and mutations, along with the types they are referencing. GraphiQL, an in-browser IDE for exploring GraphQL, is one of those (think about Swagger UI in the REST(ful) services world).

We are mostly there, our GraphQL server is ready, let us run it and send off a couple of queries and mutations, to get the feeling of it:

sbt run

[info] Running com.example.graphql.Boot
INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
INFO  reactivemongo.api.MongoDriver  - No mongo-async-driver configuration found
INFO  reactivemongo.api.MongoDriver  - [Supervisor-1] Creating connection: Connection-2
INFO  r.core.actors.MongoDBSystem  - [Supervisor-1/Connection-2] Starting the MongoDBSystem akka://reactivemongo/user/Connection-2

Very likely our data store (we are using MongoDB run as Docker container) has no users at the moment so it sounds like a good idea to add one right away:

$ curl -vi -X POST http://localhost:48080/users -H "Content-Type: application/json" -d " \
   mutation { \
     add(email: \"[email protected]\", firstName: \"John\", lastName: \"Smith\", roles: [ADMIN]) { \
       id \
       active \
       email \
       firstName \
       lastName \
       address { \
       street \
         country \
       } \
     } \
   }"

HTTP/1.1 200 OK
Server: akka-http/10.0.5
Date: Tue, 25 Apr 2017 01:01:25 GMT
Content-Type: application/json
Content-Length: 123

{
  "data":{
    "add":{
      "email":"[email protected]",
      "lastName":"Smith",
      "firstName":"John",
      "id":1493082085000,
      "address":null,
      "active":false
    }
  }
}

It seems to work perfectly fine. The response details will be always wrapped into data envelop, no matter what kind of query or mutation you are running, for example:

$ curl -vi -X POST http://localhost:48080/users -H "Content-Type: application/json" -d " \                                                
   query { \                                                                                                                           
     users { \                                                                                                                         
       id \                                                                                                                            
       email \                                                                                                                         
     } \                                                                                                                               
   }"                                                                                                                                  

HTTP/1.1 200 OK                                                                                                                                   
Server: akka-http/10.0.5                                                                                                                          
Date: Tue, 25 Apr 2017 01:09:21 GMT                                                                                                               
Content-Type: application/json                                                                                                                    
Content-Length: 98                                                                                                                                
                                                                                                                                                  
{
  "data":{
    "users":[
      {
        "id":1493082085000,
        "email":"[email protected]"
      }
    ]
  }
}

Exactly as we ordered... Honestly, working with GraphQL feels natural, specifically when data querying is involved. And we didn't even talk about fragments, variables, directives, and a lot of other things.

Now it comes to the question: should we abandon all our practices, JAX-RS, Spring MVC, ... and switch to GraphQL? I honestly believe that this is not the case, GraphQL is a good fit to address certain kind of problems, but by and large, traditional REST(ful) web services, combined with Swagger or any other established API specification framework, are here to stay.

And please be warned, along with the benefits, GraphQL comes at a price. For example, HTTP caching and cache control won't apply anymore, HATEOAS does not make much sense either, unified responses no matter what you are calling, reliability as everything is behind single facade, ... With that in mind, GraphQL is indeed a great tool, please use it wisely!

The complete project source is available on Github.

Saturday, July 23, 2016

When things may get out of control: circuit breakers in practice. Apache Zest and Akka.

In the previous post we have started the discussion about circuit breakers and why this pattern gained so much importance these days. We have learned about Netflix Hystrix, the most advanced circuit breaker implementation for JVM platform, and its typical integration scenarios. In this post we are going to continue exploring the other options available, starting from Apache Zest library.

Surprisingly, Apache Zest being certainly a gem, is not well-known and widely used. It is a framework for domain centric application development which aims to explore composite-oriented programming paradigm. Its roots go back to 2007, where it was born under another name, Qi4j (and became Apache Zest in 2015). It would require a complete book just to go through Apache Zest features and concepts, but what we are interested in is the fact that Apache Zest has simple circuit breaker implementation.

Let us use the same example to consume https://freegeoip.net/ REST(ful) web API and wrap the communication with this external service using CircuitBreaker from Apache Zest:

public class GeoIpService {
    private static final String URL = "http://freegeoip.net/";
    private final ObjectMapper mapper = new ObjectMapper();
    private final CircuitBreaker breaker = new CircuitBreaker(5, 1000 * 120);
 
    public GeoIpDetails getDetails(final String host) {
        try {
            if (breaker.isOn()) {
                GeoIpDetails details = mapper.readValue(get(host), GeoIpDetails.class);
                breaker.success();
                return details;
            } else {
                // Fallback to empty response
                return new GeoIpDetails();
            }
        } catch (final IOException ex) {
            breaker.throwable(ex);
            throw new RuntimeException("Communication with '" + URL + "' failed", ex);
        } catch (final URISyntaxException ex) {
            // Should never happen, but just trip circuit breaker immediately
            breaker.trip(); 
            throw new RuntimeException("Invalid service endpoint: " + URL, ex);
        }
    }

    private String get(final String host) throws IOException, URISyntaxException {
        return Request
            .Get(new URIBuilder(URL).setPath("/json/" + host).build())
            .connectTimeout(1000)
            .socketTimeout(3000)
            .execute()
            .returnContent()
            .asString();
    }
}

Essentially, this is as basic CircuitBreaker implementation as it could possible get. We configured it to have a threshold of 5 failures (which in our case means failing requests) and sleeping window of 2 minutes (120 * 1000 milliseconds). It becomes the responsibility of the application developer to report the successes and failures using success() and throwable(...) methods respectively, with the option to open the circuit breaker immediately using trip() method call. Please take a note that CircuitBreaker relies on Java synchronization mechanisms and is thread-safe.

Interestingly, the CircuitBreaker from Apache Zest uses a little bit different conventions: instead of operating on closed / open states, it treats them as on / off ones. Those are more familiar to most of us. And to finish up, basic JMX instrumentation is also available out of the box.

It requires a couple of lines to be added into the GeoIpService initialization (constructor f.e.) to register and expose managed beans:

public GeoIpService() throws Exception {
    final ObjectName name = new ObjectName("circuit-breakers", 
        "zest-circuit-breaker", "freegeoip.net");

    ManagementFactory
        .getPlatformMBeanServer()
        .registerMBean(new CircuitBreakerJMX(breaker, name), name);
}

Please do not hesitate to glance through official Apache Zest Circuit Breaker documentation, there are quite a few use cases you may found useful for your projects. The complete example is available on Github.

In case you are developing on JVM using Scala programming language, you are certainly a lucky one as there is native circuit breaker implementation available as part of Akka toolkit. For example, let us redesign our Geo IP service consumer as a typical Akka actor which is going to make HTTP call over to https://freegeoip.net/:

case class GeoIp(host: String)
case class GeoIpDetails(ip: String = "", countryCode: String = "", 
  countryName: String = "", latitude: Double = 0, longitude: Double = 0)

class GeoIpActor extends Actor with ActorLogging {
  import spray.json._
  import spray.json.DefaultJsonProtocol._
  
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  
  import context.dispatcher
  import context.system
  
  val breaker = new CircuitBreaker(
    context.system.scheduler,
    maxFailures = 5,
    callTimeout = 15 seconds,
    resetTimeout = 2 minutes)
  
  def receive = {
    case GeoIp(host) => breaker
      .withCircuitBreaker(Http()
        .singleRequest(HttpRequest(uri = s"http://freegeoip.net/json/$host"))
        .flatMap {
          case HttpResponse(OK, _, entity, _) => Unmarshal(entity).to[GeoIpDetails]
          case _ => Future.successful(GeoIpDetails())
        }
      ) pipeTo sender()
  }
}

At this moment, the pattern undoubtedly is looking familiar to all of us. The only new option which Akka's CircuitBreaker brings on the table is overall call timeout: the execution will be considered as failed when it is not completed within this time period (certainly very handy addition to the circuit breaker capabilities). The withCircuitBreaker function takes care of managing the circuit breaker state around the wrapped block of the code.

The interactions with the GeoIpActor are no different from any other Akka actor:

implicit val system = ActorSystem("circuit-breakers")
implicit val timeout: Timeout = 5 seconds

val geoIp = system.actorOf(Props[GeoIpActor], "geo-ip-actor")
val result = (geoIp ? GeoIp("8.8.8.8")).mapTo[GeoIpDetails]
  
result andThen { 
  case Success(details) => log.info("GEO IP: {}", details)
  case Failure(ex) => log.error("Communication error", ex)
} 

By looking a bit deeper into CircuitBreaker documentation we could get some insights about its internals. There are actually three states the CircuitBreaker could be: open, closed and half-open. The presence of half-open state serves the purpose to perform just a single attempt to try out if the invocation is back to normal operations or not.

The code snippet looks perfect but one thing to keep in mind is how the Actor Model and absence of shared mutable state affects the circuit breaker state synchronization. To facilitate that, Akka's CircuitBreaker has a rich set of callback notifications (like onOpen, onHalfOpen, onClose) so the state changes could be broadcasted between actors. The complete project sources are available on Github.

For the curious readers, just a few closing notes about adoption of the circuit breaker implementations. Netflix Hystrix is the number one choice at the moment, particularly (but not only) because of superior support from Spring community. Obviously, Akka CircuitBreaker is a natural choice for Scala developers who build their applications on top of excellent Akka toolkit. Concerning Apache Zest Circuit Breaker, it could be used as-is (if you want to fully control the behavior) or be easily integrated as an useful extension into existing general-purpose clients. For example, Apache CXF allows to configure JAX-RS / JAX-WS clients with Failover feature, including the circuit breaker-based implementation: CircuitBreakerFailoverFeature.

Hope this series of posts extended a little bit your awareness about circuit breaker pattern and the state of its available implementations on the JVM platform. The repository with complete project samples is available on Github.

Stay resilient!

Saturday, December 26, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Queries.

Quite a while ago we have started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we have covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregate persistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}
It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users]  
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
   
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
      
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
    
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption) 
        .onComplete { replyTo ! _ }
    }
    
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}
Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:
  • CreateSchema to initialize database schema
  • UpdateUser to update user's email address
  • FindUserByEmail to query user by its email address
  • FindAllUsers to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate, more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that: before actually performing the update we will run the query against read model (data store) to make sure no user which such email is already registered.

val receiveCommand: Receive = {
  case UserEmailUpdate(email) => 
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
        
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage) 
  }
}
Pretty simple, but not really idiomatic: the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if result is not ready) but it didn't, very likely because of thread context switch. So the Await.result was here to the rescue.

Now every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.

case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
      
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId => 
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event => 
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}
Basically, the code is simple enough but let us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case, UserEmailUpdated, we just directly transform it into data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing at the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.

pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map { 
        case Success(users) => 
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) => 
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}
We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let ensure that our journal and data store are empty.
$ curl -X GET http://localhost:38080/api/v1/users
[]
Make sense, as we haven't created any users yet. Let us do that by updating two users with different email addresses and querying the read model again.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email updated: [email protected]

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d [email protected]
Email updated: [email protected]

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]
As expected, this time the result is different and we can see two users are being returned. The moment of truth, let us try to update the email of user with 124 with the one of user with 123.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email '[email protected]' already registered
Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and CQRS could be an interesting option to consider for your next projects.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Saturday, February 28, 2015

A fresh look on accessing database on JVM platform: Slick from Typesafe

In today's post we are going to open our mind, step away from traditional Java EE / Java SE JPA-based stack (which I think is great) and take a refreshing look on how to access database in your Java applications using the new kid on the block: Slick 2.1 from Typesafe. So if JPA is so great, why bother? Well, sometimes you need to do very simple things and there is no need to bring the complete, well modeled persistence layer for that. In here Slick shines.

In the essence, Slick is database access library, not an ORM. Though it is written in Scala, the examples we are going to look at do not require any particular knowledge of this excellent language (although, it is just Scala that made Slick possible to exist). Our relational database schema will have only two tables, customers and addresses, linked by one-to-many relationships. For simplicity, H2 has been picked as an in-memory database engine.

The first question which comes up is defining database tables (schema) and, naturally, database specific DDLs are the standard way of doing that. Can we do something about it and try another approach? If you are using Slick 2.1, the answer is yes, absolutely. Let us just describe our tables as Scala classes:

// The 'customers' relation table definition
class Customers( tag: Tag ) extends Table[ Customer ]( tag, "customers" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def email = column[ String ]( "email", O.Length( 512, true ), O.NotNull )
  def firstName = column[ String ]( "first_name", O.Length( 256, true ), O.Nullable )
  def lastName = column[ String ]( "last_name", O.Length( 256, true ), O.Nullable )

  // Unique index for customer's email
  def emailIndex = index( "idx_email", email, unique = true )
}

Very easy and straightforward, resembling a lot typical CREATE TABLE construct. The addresses table is going to be defined the same way and reference users table by foreign key.

// The 'customers' relation table definition
class Addresses( tag: Tag ) extends Table[ Address ]( tag, "addresses" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def street = column[ String ]( "street", O.Length( 100, true ), O.NotNull )
  def city = column[ String ]( "city", O.Length( 50, true ), O.NotNull )
  def country = column[ String ]( "country", O.Length( 50, true ), O.NotNull )

  // Foreign key to 'customers' table
  def customerId = column[Int]( "customer_id", O.NotNull )
  def customer = foreignKey( "customer_fk", customerId, Customers )( _.id )
}

Great, leaving off some details, that is it: we have defined two database tables in pure Scala. But details are important and we are going to look closely on following two declarations: Table[ Customer ] and Table[ Address ]. Essentially, each table could be represented as a tuple with as many elements as column it has defined. For example, customers is a tuple of (Int, String, String, String), while addresses table is a tuple of (Int, String, String, String, Int). Tuples in Scala are great but they are not very convenient to work with. Luckily, Slick allows to use case classes instead of tuples by providing so called Lifted Embedding technique. Here are our Customer and Address case classes:

case class Customer( id: Option[Int] = None, email: String, 
  firstName: Option[ String ] = None, lastName: Option[ String ] = None)

case class Address( id: Option[Int] = None,  street: String, city: String, 
  country: String, customer: Customer )

The last but not least question is how Slick is going to convert from tuples to case classes and vice-versa? It would be awesome to have such conversion out-of-the box but at this stage Slick needs a bit of help. Using Slick terminology, we are going to shape * table projection (which corresponds to SELECT * FROM ... SQL construct). Let see how it looks like for customers:

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, email, firstName.?, lastName.? ).shaped <> ( 
  Customer.tupled, Customer.unapply )

For addresses table, shaping looks a little bit more verbose due to the fact that Slick does not have a way to refer to Customer case class instance by foreign key. Still, it is pretty straightforward, we just construct temporary Customer from its identifier.

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, street, city, country, customerId ).shaped <> ( 
  tuple => {
    Address.apply(
      id = tuple._1,
      street = tuple._2,
      city = tuple._3,
      country = tuple._4,
      customer = Customer( Some( tuple._5 ), "" )
    )
  }, {
    (address: Address) =>
      Some { (
        address.id,
        address.street,
        address.city,
        address.country,
        address.customer.id getOrElse 0 
      )
    }
  }
)

Now, when all details have been explained, how can we materialize our Scala table definitions into the real database tables? Thankfully to Slick, it is a easy as that:

implicit lazy val DB = Database.forURL( "jdbc:h2:mem:test", driver = "org.h2.Driver" )
  
DB withSession { implicit session =>
  ( Customers.ddl ++ Addresses.ddl ).create
}

Slick has many ways to query and update data in database. The most beautiful and powerful one is just using pure functional constructs of the Scala language. The easiest way of doing that is by defining companion object and implement typical CRUD operations in it. For example, here is the method which inserts new customer record into customers table:

object Customers extends TableQuery[ Customers ]( new Customers( _ ) ) {
  def create( customer: Customer )( implicit db: Database ): Customer = 
    db.withSession { implicit session =>
      val id = this.autoIncrement.insert( customer )
      customer.copy( id = Some( id ) )
    } 
}

And it could be used like this:

Customers.create( Customer( None, "[email protected]",  Some( "Tom" ), Some( "Tommyknocker" ) ) )
Customers.create( Customer( None, "[email protected]",  Some( "Bob" ), Some( "Bobbyknocker" ) ) )

Similarly, the family of find functions could be implemented using regular Scala for comprehension:

def findByEmail( email: String )( implicit db: Database ) : Option[ Customer ] = 
  db.withSession { implicit session =>
    ( for { customer <- this if ( customer.email === email.toLowerCase ) } 
      yield customer ) firstOption
  }
   
def findAll( implicit db: Database ): Seq[ Customer ] = 
  db.withSession { implicit session =>      
    ( for { customer <- this } yield customer ) list
  }

And here are usage examples:

val customers = Customers.findAll
val customer = Customers.findByEmail( "[email protected]" )

Updates and deletes are a bit different though very simple as well, let us take a look on those:

def update( customer: Customer )( implicit db: Database ): Boolean = 
  db.withSession { implicit session =>
    val query = for { c <- this if ( c.id === customer.id ) } 
      yield (c.email, c.firstName.?, c.lastName.?)
    query.update(customer.email, customer.firstName, customer.lastName) > 0
  }
  
def remove( customer: Customer )( implicit db: Database ) : Boolean = 
  db.withSession { implicit session =>
    ( for { c <- this if ( c.id === customer.id ) } yield c ).delete > 0
  }

Let us see those two methods in action:

Customers.findByEmail( "[email protected]" ) map { customer =>
  Customers.update( customer.copy( firstName = Some( "Tommy" ) ) )
  Customers.remove( customer )
}

Looks very neat. I am personally still learning Slick however I am pretty excited about it. It helps me to have things done much faster, enjoying the beauty of Scala language and functional programming. No doubts, the upcoming version 3.0 is going to bring even more interesting features, I am looking forward to it.

This post is just an introduction into world of Slick, a lot of implementation details and use cases have been left aside to keep it short and simple. However Slick's documentation is pretty good and please do not hesitate to consult it.

The complete project is available on GitHub.

Sunday, February 23, 2014

Knowing how all your components work together: distributed tracing with Zipkin

In today's post we will try to cover very interesting and important topic: distributed system tracing. What it practically means is that we will try to trace the request from the point it was issued by the client to the point the response to this request was received. At first, it looks quite straightforward but in reality it may involve many calls to several other systems, databases, NoSQL stores, caches, you name it ...

In 2010 Google published a paper about Dapper, a large-scale distributed systems tracing infrastructure (very interesting reading by the way). Later on, Twitter built its own implementation based on Dapper paper, called Zipkin and that's the one we are going to look at.

We will build a simple JAX-RS 2.0 server using great Apache CXF library. For the client side, we will use JAX-RS 2.0 client API and by utilizing Zipkin we will trace all the interactions between the client and the server (as well as everything happening on server side). To make an example a bit more illustrative, we will pretend that server uses some kind of database to retrieve the data. Our code will be a mix of pure Java and a bit of Scala (the choice of Scala will be cleared up soon).

One additional dependency in order for Zipkin to work is Apache Zookeeper. It is required for coordination and should be started in advance. Luckily, it is very easy to do:

Now back to Zipkin. Zipkin is written in Scala. It is still in active development and the best way to start off with it is just by cloning its GitHub repository and build it from sources:

git clone https://github.com/twitter/zipkin.git

From architectural prospective, Zipkin consists of three main components:

  • collector: collects traces across the system
  • query: queries collected traces
  • web: provides web-based UI to show the traces

To run them, Zipkin guys provide useful scripts in the bin folder with the only requirement that JDK 1.7 should be installed:

  • bin/collector
  • bin/query
  • bin/web
Let's execute these scripts and ensure that every component has been started successfully, with no stack traces on the console (for curious readers, I was not able to make Zipkin work on Windows so I assume we are running it on Linux box). By default, Zipkin web UI is available on port 8080. The storage for traces is embedded SQLite engine. Though it works, the better storages (like awesome Redis) are available.

The preparation is over, let's do some code. We will start with JAX-RS 2.0 client part as it's very straightforward (ClientStarter.java):

package com.example.client;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.example.zipkin.Zipkin;
import com.example.zipkin.client.ZipkinRequestFilter;
import com.example.zipkin.client.ZipkinResponseFilter;

public class ClientStarter {
  public static void main( final String[] args ) throws Exception { 
    final Client client = ClientBuilder
      .newClient()
      .register( new ZipkinRequestFilter( "People", Zipkin.tracer() ), 1 )
      .register( new ZipkinResponseFilter( "People", Zipkin.tracer() ), 1 );        
                        
    final Response response = client
      .target( "http://localhost:8080/rest/api/people" )
      .request( MediaType.APPLICATION_JSON )
      .get();

    if( response.getStatus() == 200 ) {
      System.out.println( response.readEntity( String.class ) );
    }
        
    response.close();
    client.close();
        
    // Small delay to allow tracer to send the trace over the wire
    Thread.sleep( 1000 );
  }
}

Except a couple of imports and classes with Zipkin in it, everything should look simple. So what those ZipkinRequestFilter and ZipkinResponseFilter are for? Zipkin is awesome but it's not a magical tool. In order to trace any request in distributed system, there should be some context passed along with it. In REST/HTTP world, it's usually request/response headers. Let's take a look on ZipkinRequestFilter first (ZipkinRequestFilter.scala):

package com.example.zipkin.client

import javax.ws.rs.client.ClientRequestFilter
import javax.ws.rs.ext.Provider
import javax.ws.rs.client.ClientRequestContext
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.tracing.Tracer

@Provider
class ZipkinRequestFilter( val name: String, val tracer: Tracer ) extends ClientRequestFilter {
  def filter( requestContext: ClientRequestContext ): Unit = {      
    Trace.pushTracerAndSetNextId( tracer, true )
 
    requestContext.getHeaders().add( HttpTracing.Header.TraceId, Trace.id.traceId.toString )
    requestContext.getHeaders().add( HttpTracing.Header.SpanId, Trace.id.spanId.toString )
         
    Trace.id._parentId foreach { id => 
      requestContext.getHeaders().add( HttpTracing.Header.ParentSpanId, id.toString ) 
    }    

    Trace.id.sampled foreach { sampled => 
      requestContext.getHeaders().add( HttpTracing.Header.Sampled, sampled.toString ) 
    }

    requestContext.getHeaders().add( HttpTracing.Header.Flags, Trace.id.flags.toLong.toString )
             
    if( Trace.isActivelyTracing ) {
      Trace.recordRpcname( name,  requestContext.getMethod() )
      Trace.recordBinary( "http.uri", requestContext.getUri().toString()  )
      Trace.record( Annotation.ClientSend() )    
    }
  }
}

A bit of Zipkin internals will make this code superclear. The central part of Zipkin API is Trace class. Every time we would like to initiate tracing, we should have a Trace Id and the tracer to actually trace it. This single line generates new Trace Id and register the tracer (internally this data is held in thread local state).

Trace.pushTracerAndSetNextId( tracer, true )

Traces are hierarchical by nature, so do Trace Ids: every Trace Id could be a root or part of another trace. In our example, we know for sure that we are the first and as such the root of the trace. Later on the Trace Id is wrapped into HTTP headers and will be passed along the request (we will see on server side how it is being used). The last three lines associate the useful information with the trace: name of our API (People), HTTP method, URI and most importantly, that it's the client sending the request to the server.

Trace.recordRpcname( name,  requestContext.getMethod() )
Trace.recordBinary( "http.uri", requestContext.getUri().toString()  )
Trace.record( Annotation.ClientSend() ) 

The ZipkinResponseFilter does the reverse to ZipkinRequestFilter and extract the Trace Id from the request headers (ZipkinResponseFilter.scala):

package com.example.zipkin.client

import javax.ws.rs.client.ClientResponseFilter
import javax.ws.rs.client.ClientRequestContext
import javax.ws.rs.client.ClientResponseContext
import javax.ws.rs.ext.Provider
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.SpanId
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.tracing.Flags
import com.twitter.finagle.tracing.Tracer

@Provider
class ZipkinResponseFilter( val name: String, val tracer: Tracer ) extends ClientResponseFilter {  
  def filter( requestContext: ClientRequestContext, responseContext: ClientResponseContext ): Unit = {
    val spanId = SpanId.fromString( requestContext.getHeaders().getFirst( HttpTracing.Header.SpanId ).toString() )

    spanId foreach { sid =>
      val traceId = SpanId.fromString( requestContext.getHeaders().getFirst( HttpTracing.Header.TraceId ).toString() )
        
      val parentSpanId = requestContext.getHeaders().getFirst( HttpTracing.Header.ParentSpanId ) match {
        case s: String => SpanId.fromString( s.toString() )
        case _ => None
      }

      val sampled = requestContext.getHeaders().getFirst( HttpTracing.Header.Sampled ) match { 
        case s: String =>  s.toString.toBoolean
        case _ => true
      }
        
      val flags = Flags( requestContext.getHeaders().getFirst( HttpTracing.Header.Flags ).toString.toLong )        
      Trace.setId( TraceId( traceId, parentSpanId, sid, Option( sampled ), flags ) )
    }
      
    if( Trace.isActivelyTracing ) {
      Trace.record( Annotation.ClientRecv() )
    }
  }
}

Strictly speaking, in our example it's not necessary to extract the Trace Id from the request because both filters should be executed by the single thread. But the last line is very important: it marks the end of our trace by saying that client has received the response.

Trace.record( Annotation.ClientRecv() )

What's left is actually the tracer itself (Zipkin.scala):

package com.example.zipkin

import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.zipkin.thrift.ZipkinTracer
import com.twitter.finagle.tracing.Trace
import javax.ws.rs.ext.Provider

object Zipkin {
  lazy val tracer = ZipkinTracer.mk( host = "localhost", port = 9410, DefaultStatsReceiver, 1 )
}

If at this point you are confused what all those traces and spans mean please look through this documentation page, you will get the basic understanding of those concepts.

At this point, there is nothing left on the client side and we are good to move to the server side. Our JAX-RS 2.0 server will expose the single endpoint (PeopleRestService.java):

package com.example.server.rs;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Callable;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import com.example.model.Person;
import com.example.zipkin.Zipkin;

@Path( "/people" ) 
public class PeopleRestService {
  @Produces( { "application/json" } )
  @GET
  public Collection< Person > getPeople() {
    return Zipkin.invoke( "DB", "FIND ALL", new Callable< Collection< Person > >() {
      @Override
      public Collection call() throws Exception {
        return Arrays.asList( new Person( "Tom", "Bombdil" ) );
      }   
    } );   
  }
}

As we mentioned before, we will simulate the access to database and generate a child trace by using Zipkin.invoke wrapper (which looks very simple, Zipkin.scala):

package com.example.zipkin

import java.util.concurrent.Callable
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.zipkin.thrift.ZipkinTracer
import com.twitter.finagle.tracing.Annotation

object Zipkin {
  lazy val tracer = ZipkinTracer.mk( host = "localhost", port = 9410, DefaultStatsReceiver, 1 )
    
  def invoke[ R ]( service: String, method: String, callable: Callable[ R ] ): R = Trace.unwind {
    Trace.pushTracerAndSetNextId( tracer, false )      
      
    Trace.recordRpcname( service, method );
    Trace.record( new Annotation.ClientSend() );
          
    try {
      callable.call()
    } finally {
      Trace.record( new Annotation.ClientRecv() );
    }
  }   
}

As we can see, in this case the server itself becomes a client for some other service (database).

The last and most important part of the server is to intercept all HTTP requests, extract the Trace Id from them so it will be possible to associate more data with the trace (annotate the trace). In Apache CXF it's very easy to do by providing own invoker (ZipkinTracingInvoker.scala):

package com.example.zipkin.server

import org.apache.cxf.jaxrs.JAXRSInvoker
import com.twitter.finagle.tracing.TraceId
import org.apache.cxf.message.Exchange
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.Annotation
import org.apache.cxf.jaxrs.model.OperationResourceInfo
import org.apache.cxf.jaxrs.ext.MessageContextImpl
import com.twitter.finagle.tracing.SpanId
import com.twitter.finagle.http.HttpTracing
import com.twitter.finagle.tracing.Flags
import scala.collection.JavaConversions._
import com.twitter.finagle.tracing.Tracer
import javax.inject.Inject

class ZipkinTracingInvoker extends JAXRSInvoker {
  @Inject val tracer: Tracer = null
  
  def trace[ R ]( exchange: Exchange )( block: => R ): R = {
    val context = new MessageContextImpl( exchange.getInMessage() )
    Trace.pushTracer( tracer )
        
    val id = Option( exchange.get( classOf[ OperationResourceInfo ] ) ) map { ori =>
      context.getHttpHeaders().getRequestHeader( HttpTracing.Header.SpanId ).toList match {
        case x :: xs => SpanId.fromString( x ) map { sid => 
          val traceId = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.TraceId ).toList match {
            case x :: xs => SpanId.fromString( x )
            case _ => None
          }
          
          val parentSpanId = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.ParentSpanId ).toList match {
            case x :: xs => SpanId.fromString( x )
            case _ => None
          }
  
          val sampled = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.Sampled ).toList match { 
            case x :: xs =>  x.toBoolean
            case _ => true
          }
                    
          val flags = context.getHttpHeaders().getRequestHeader( HttpTracing.Header.Flags ).toList match {
            case x :: xs =>  Flags( x.toLong )
            case _ => Flags()
          }
         
          val id = TraceId( traceId, parentSpanId, sid, Option( sampled ), flags )                     
          Trace.setId( id )
        
          if( Trace.isActivelyTracing ) {
            Trace.recordRpcname( context.getHttpServletRequest().getProtocol(), ori.getHttpMethod() )
            Trace.record( Annotation.ServerRecv() )
          }
        
          id
        }           
          
        case _ => None
      }
    }
    
    val result = block
    
    if( Trace.isActivelyTracing ) {
      id map { id => Trace.record( new Annotation.ServerSend() ) }
    }
    
    result
  }
  
  @Override
  override def invoke( exchange: Exchange, parametersList: AnyRef ): AnyRef = {
    trace( exchange )( super.invoke( exchange, parametersList ) )     
  }
}

Basically, the only thing this code does is extracting Trace Id from request and associating it with the current thread. Also please notice that we associate additional data with the trace marking the server participation.

  Trace.recordRpcname( context.getHttpServletRequest().getProtocol(), ori.getHttpMethod() )
  Trace.record( Annotation.ServerRecv() )

To see the tracing in live, let's start our server (please notice that sbt should be installed), assuming all Zipkin components and Apache Zookeeper are already up and running:

sbt 'project server' 'run-main com.example.server.ServerStarter'
then the client:
sbt 'project client' 'run-main com.example.client.ClientStarter'
and finally open Zipkin web UI at http://localhost:8080. We should see something like that (depending how many times you have run the client):

Alternatively, we can build and run fat JARs using sbt-assembly plugin:

sbt assembly
java -jar server/target/zipkin-jaxrs-2.0-server-assembly-0.0.1-SNAPSHOT.jar 
java -jar client/target/zipkin-jaxrs-2.0-client-assembly-0.0.1-SNAPSHOT.jar 

If we click on any particular trace, the more detailed information will be shown, much resembling client <-> server <-> database chain.

Even more details are shown when we click on particular element in the tree.

Lastly, the bonus part is components / services dependency graph.

As we can see, all the data associated with the trace is here and follows hierarchical structure. The root and child traces are detected and shown, as well as timelines for client send/receive and server receive/send chains. Our example is quite naive and simple, but even like that it demonstrates how powerful and useful distributed system tracing is. Thanks to Zipkin guys.

The complete source code is available on GitHub.

Monday, January 13, 2014

Your build tool is your good friend: what sbt can do for Java developer

I think for developers picking the right build tool is a very important choice. For years I have been sticking to Apache Maven and, honestly, it does the job well enough, even nowadays it's a good tool to use. But I always feel it could be done much better ... and then Gradle came along ...

Despite many hours I have spent getting accustomed to Gradle way to do things, I finally gave up and switched back to Apache Maven. The reason - I didn't feel comfortable with it, mostly because of Groovy DSL. Anyway, I think Gradle is great, powerful and extensible build tool which is able to perform any task your build process needs.

But engaging myself more and more with Scala, I quickly discovered sbt. Though sbt is acronym for "simple build tool", my first impression was quite a contrary: I found it complicated and hard to understand. For some reasons, I liked it nonetheless and by spending more time reading the documentation (which is getting better and better), many experiments, I finally would say the choice is made. In this post I would like to show up couple of great things sbt can do to make Java developer's life easy (some knowledge of Scala would be very handy, but it's not required).

Before moving on to real example, couple of facts about sbt. It uses Scala as a language for build scenario and requires a launcher which could be downloaded from here (the version we'll be using is 0.13.1). There are several ways to describe build in sbt, the one this post demonstrates is by using Build.scala with single project.

Our example is a simple Spring console application with couple of JUnit test cases: just enough to see how build with external dependencies is structured and tests are run. Application contains only two classes:

package com.example;

import org.springframework.stereotype.Service;

@Service
public class SimpleService {
    public String getResult() {
        return "Result";
    }
}
and
package com.example;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;

public class Starter {
    @Configuration
    @ComponentScan( basePackageClasses = SimpleService.class )
    public static class AppConfig {  
    }
 
    public static void main( String[] args ) {
        try( GenericApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) {
            final SimpleService service = context.getBean( SimpleService.class );
            System.out.println( service.getResult() );
        }
    }
}

Now, let see how sbt build looks like. By convention, Build.scala should be located in project subfolder. Additionally, there should be present build.properties file with desired sbt version and plugins.sbt with external plugins (we will use sbteclipse plugin to generate Eclipse project files). We will start with build.properties which contains only one line:

sbt.version=0.13.1

and continue with plugins.sbt, which in our case is also just one line:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0")

Lastly, let's start with the heart of our build: Build.scala. There would be two parts in it: common settings for all projects in our build (useful for multi-project builds but we have only one now) and here is the snippet of this part:

import sbt._
import Keys._
import com.typesafe.sbteclipse.core.EclipsePlugin._

object ProjectBuild extends Build {
  override val settings = super.settings ++ Seq(
    organization := "com.example",    
    name := "sbt-java",    
    version := "0.0.1-SNAPSHOT",    
    
    scalaVersion := "2.10.3",
    scalacOptions ++= Seq( "-encoding", "UTF-8", "-target:jvm-1.7" ),    
    javacOptions ++= Seq( "-encoding", "UTF-8", "-source", "1.7", "-target", "1.7" ),        
    outputStrategy := Some( StdoutOutput ),
    compileOrder := CompileOrder.JavaThenScala,
    
    resolvers ++= Seq( 
      Resolver.mavenLocal, 
      Resolver.sonatypeRepo( "releases" ), 
      Resolver.typesafeRepo( "releases" )
    ),        

    crossPaths := false,            
    fork in run := true,
    connectInput in run := true,
    
    EclipseKeys.executionEnvironment := Some(EclipseExecutionEnvironment.JavaSE17)
  )
}

The build above looks quite clean and understandable: resolvers is a straight analogy of Apache Maven repositories, EclipseKeys.executionEnvironment is customization for execution environment (Java SE 7) for generated Eclipse project. All these keys are very well documented.

Second part is much smaller and defines our main project in terms of dependencies and main class:

lazy val main = Project( 
  id = "sbt-java",  
  base = file("."), 
  settings = Project.defaultSettings ++ Seq(              
    mainClass := Some( "com.example.Starter" ),
     
    initialCommands in console += """
      import com.example._
      import com.example.Starter._
      import org.springframework.context.annotation._
    """,
       
    libraryDependencies ++= Seq(
      "org.springframework" % "spring-context" % "4.0.0.RELEASE",
      "org.springframework" % "spring-beans" % "4.0.0.RELEASE",
      "org.springframework" % "spring-test" % "4.0.0.RELEASE" % "test",
      "com.novocode" % "junit-interface" % "0.10" % "test",
      "junit" % "junit" % "4.11" % "test"
    )
  ) 
)

The initialCommands requires a bit of explanation here: sbt is able to run Scala console (REPL) and this setting allows to add default import statements so we can use our classes immediately. The dependency to junit-interface allows sbt to run JUnit test cases and it's the first thing we'll do: add some tests. Before creating actual tests, we will start sbt and ask it to run test cases on every code change, just like that:

sbt ~test

While sbt is running, we will add a test case:

package com.example;

import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThat;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.GenericApplicationContext;

import com.example.Starter.AppConfig;

public class SimpleServiceTestCase {
    private GenericApplicationContext context;
    private SimpleService service;
 
    @Before
    public void setUp() {
        context = new AnnotationConfigApplicationContext( AppConfig.class );
        service = context.getBean( SimpleService.class );
    }
 
    @After
    public void tearDown() {
        context.close();
    }

    @Test
    public void testSampleTest() { 
        assertThat( service.getResult(), equalTo( "Result" ) );
    } 
}

In a console we should see that sbt picked the change automatically and run all test cases. Sadly, because of this issue which is already fixed and should be available in next release of junit-interface, we cannot use @RunWith and @ContextConfiguration annotation to run Spring test cases yet.

For TDD practitioners it's an awesome feature to have. The next terrific feature we are going to look at is Scala console (RELP) which gives as the ability to play with application without actually running it. It could be invoked by typing:

sbt console

and observing something like this in the terminal (as we can see, the imports from initialCommands are automatically included):

At this moment playground is established and we can do a lot of very interesting things, for example: create context, get beans and call any methods on them:

sbt takes care about classpath so all your classes and external dependencies are available for use. I found this way to discover things much faster than by using debugger or other techniques.

At the moment, there is no good support for sbt in Eclipse but it's very easy to generate Eclipse project files by using sbteclipse plugin we've touched before:

sbt eclipse

Awesome! Not to mention other great plugins which are kindly listed here and the ability to import Apache Maven POM files using externalPom() which really simplifies the migration. As a conclusion from my side, if you are looking for better, modern, extensible build tool for your project, please take a look at sbt. It's a great piece of software built on top of awesome, consise language.

Complete project is available on GitHub.