Showing posts with label event stream. Show all posts
Showing posts with label event stream. Show all posts

Tuesday, July 28, 2020

It is never enough of them: enriching Apache Avro generated classes with custom Java annotations

Apache Avro, along with Apache Thrift and Protocol Buffers, is often being used as a platform-neutral extensible mechanism for serializing structured data. In the context of event-driven systems, the Apache Avro's schemas play the role of the language-agnostic contracts, shared between loosely-coupled components of the system, not necessarily written using the same programming language.

Probably, the most widely adopted reference architecture for such systems circles around Apache Kafka backed by Schema Registry and Apache Avro, although many other excellent options are available. Nevertheless, why Apache Avro?

The official documentation page summarizes pretty well the key advantages Apache Avro has over Apache Thrift and Protocol Buffers. But we are going to add another one to the list: biased (in a good sense) support of the Java and JVM platform in general.

Let us imagine that one of the components (or, it has to be said, microservice) takes care of the payment processing. Not every payment may succeed and to propagate such failures, the component broadcasts PaymentRejectedEvent whenever such unfortunate event happens. Here is its Apache Avro schema, persisted in the PaymentRejectedEvent.avsc file.

{
    "type": "record",
    "name": "PaymentRejectedEvent",
    "namespace": "com.example.event",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "reason",
            "type": {
                "type": "enum",
                "name": "PaymentStatus",
                "namespace": "com.example.event",
                "symbols": [
                    "EXPIRED_CARD",
                    "INSUFFICIENT_FUNDS",
                    "DECLINED"
                ]
            }
        },
        {
            "name": "date",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            }
        }
    ]
}

The event is notoriously kept simple, you can safely assume that in more or less realistic system it has to have considerably more details available. To turn this event into Java class at build time, we could use Apache Avro Maven plugin, it is as easy as it could get.

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.10.0</version>
    <configuration>
        <stringType>String</stringType>
    </configuration>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                <outputDirectory>${project.build.directory}/generated-sources/avro/</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

Once the build finishes, you will get PaymentRejectedEvent Java class generated. But a few annoyances are going to emerge right away:

@org.apache.avro.specific.AvroGenerated
public class PaymentRejectedEvent extends ... {
   private java.lang.String id;
   private com.example.event.PaymentStatus reason;
   private long date;
}

The Java's types for id and date fields are not really what we would expect. Luckily, this is easy to fix by specifying customConversions plugin property, for example.

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.10.0</version>
    <configuration>
        <stringType>String</stringType>
        <customConversions>
            org.apache.avro.Conversions$UUIDConversion,org.apache.avro.data.TimeConversions$LocalTimestampMillisConversion
        </customConversions>
    </configuration>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                <outputDirectory>${project.build.directory}/generated-sources/avro/</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

If we build the project this time, the plugin would generate the right types.

@org.apache.avro.specific.AvroGenerated
public class PaymentRejectedEvent extends ... {
   private java.util.UUID id;
   private com.example.event.PaymentStatus reason;
   private java.time.LocalDateTime date;
}

It looks much better! But what about next challenge. In Java, annotations are commonly used to associate some additional metadata pieces with a particular language element. What if we have to add a custom, application-specific annotation to all generated event classes? It does not really matter which one, let it be @javax.annotation.Generated, for example. It turns out, with Apache Avro it is not an issue, it has dedicated javaAnnotation property we could benefit from.

{
    "type": "record",
    "name": "PaymentRejectedEvent",
    "namespace": "com.example.event",
    "javaAnnotation": "javax.annotation.Generated(\"avro\")",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "reason",
            "type": {
                "type": "enum",
                "name": "PaymentStatus",
                "namespace": "com.example.event",
                "symbols": [
                    "EXPIRED_CARD",
                    "INSUFFICIENT_FUNDS",
                    "DECLINED"
                ]
            }
        },
        {
            "name": "date",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            }
        }
    ]
}

When we rebuild the project one more time (hopefully the last one), the generated PaymentRejectedEvent Java class is going to be decorated with the additional custom annotation.

@javax.annotation.Generated("avro")
@org.apache.avro.specific.AvroGenerated
public class PaymentRejectedEvent extends ... {
   private java.util.UUID id;
   private com.example.event.PaymentStatus reason;
   private java.time.LocalDateTime date;
}

Obviously, this property has no effect if the schema is used to produce respective constructs in other programming languages but it still feels good to see that Java has privileged support in Apache Avro, thanks for that! As a side note, it is good to see that after some quite long inactivity time the project is expiriencing the second breath, with regular releases and new features delivered constantly.

The complete source code is available on Github.

Saturday, December 26, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Queries.

Quite a while ago we have started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we have covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregate persistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}
It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users]  
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
   
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
      
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
    
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption) 
        .onComplete { replyTo ! _ }
    }
    
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}
Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:
  • CreateSchema to initialize database schema
  • UpdateUser to update user's email address
  • FindUserByEmail to query user by its email address
  • FindAllUsers to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate, more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that: before actually performing the update we will run the query against read model (data store) to make sure no user which such email is already registered.

val receiveCommand: Receive = {
  case UserEmailUpdate(email) => 
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
        
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage) 
  }
}
Pretty simple, but not really idiomatic: the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if result is not ready) but it didn't, very likely because of thread context switch. So the Await.result was here to the rescue.

Now every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.

case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
      
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId => 
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event => 
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}
Basically, the code is simple enough but let us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case, UserEmailUpdated, we just directly transform it into data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing at the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.

pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map { 
        case Success(users) => 
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) => 
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}
We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let ensure that our journal and data store are empty.
$ curl -X GET http://localhost:38080/api/v1/users
[]
Make sense, as we haven't created any users yet. Let us do that by updating two users with different email addresses and querying the read model again.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email updated: [email protected]

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d [email protected]
Email updated: [email protected]

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]
As expected, this time the result is different and we can see two users are being returned. The moment of truth, let us try to update the email of user with 124 with the one of user with 123.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d [email protected]
Email '[email protected]' already registered
Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"[email protected]"},
  {"id":"124","email":"[email protected]"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and CQRS could be an interesting option to consider for your next projects.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.