Showing posts with label rxjava. Show all posts
Showing posts with label rxjava. Show all posts

Wednesday, September 30, 2020

For gourmets and practioners: pick your flavour of the reactive stack with JAX-RS and Apache CXF

When JAX-RS 2.1 specification was released back in 2017, one of its true novelties was the introduction of the reactive API extensions. The industry has acknowledged the importance of the modern programming paradigms and specification essentially mandated the first-class support of the asynchronous and reactive programming for the Client API.

But what about the server side? It was not left out, the JAX-RS 2.1 asynchronous processing model has been enriched with Java 8's CompletionStage support, certainly a step in a right direction. Any existing REST web APIs built on top of the JAX-RS 2.1 implementation (like Apache CXF for example) could benefit from such enhancements right away.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public CompletionStage<List<Person>> getPeople() {
        return CompletableFuture
        	.supplyAsync(() -> Arrays.asList(new Person("[email protected]", "Tom", "Knocker")));
    }
}

Udoubtedly, CompletionStage and CompletableFuture are powerful tools but not without own quirks and limitations. The Reactive Streams specification and a number of its implementations offer a considerably better glimpse on how asynchronous and reactive programming should look like on JVM. With that, the logical question pops up: could your JAX-RS web services and APIs take advantage of the modern reactive libraries? And if the answer is positive, what does it take?

If your bets are on Apache CXF, you are certainly well positioned. The latest Apache CXF 3.2.14 / 3.3.7 / 3.4.0 release trains bring a comprehesive support of RxJava3, RxJava2 and Project Reactor. Along this post we are going to see how easy it is to plug your favorite reactive library in and place it at the forefront of your REST web APIs and services.

Since the most applications and services on the JVM are built on top of excellent Spring framework and Spring Boot, we will be developing the reference implementations using those as a foundation. The Spring Boot starter which comes along with Apache CXF distribution is taking care of most of the boring wirings you would have needed to do otherwise.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-spring-boot-starter-jaxrs</artifactId>
	<version>3.4.0</version>
</dependency>

The Project Reactor is the number one choice as the reactive foundation for Spring-based applications and services, so let us just start from that.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-reactor</artifactId>
	<version>3.4.0</version>
</dependency>

Great, believe it or not, we are mostly done here. In order to teach Apache CXF to understand Project Reactor types like Mono or/and Flux we need to tune the configuration just a bit using ReactorCustomizer instance.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactorCustomizer().customize(bean);
        return bean.create();
    }
}

With such customization in-place, our JAX-RS web services and APIs could freely utilize Project Reactor primitives in a streaming fashion, for example.

import reactor.core.publisher.Flux;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Flux<Person> getPeople() {
        return Flux.just(new Person("[email protected]", "Tom", "Knocker"));
    }
}

As you probably noticed, the implementation purposely does not do anything complicated. However, once the reactive types are put at work, you could unleash the full power of the library of your choice (and Project Reactor is really good at that).

Now, when you undestand the principle, it comes the turn of the RxJava3, the last generation of the pioneering reactive library for the JVM platform.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-rx3</artifactId>
	<version>3.4.0</version>
</dependency>

The configuration tuning is mostly identical to the one we have seen with Project Reactor, the customizer instance, ReactiveIOCustomizer, is all that changes.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactiveIOCustomizer().customize(bean);
        return bean.create();
    }
}

The list of supported types includes Flowable, Single and Observable, the equivalent implementation in terms of RxJava3 primitives may look like this.


import io.reactivex.rxjava3.core.Flowable;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Flowable<Person> getPeople() {
        return Flowable.just(new Person("[email protected]", "Tom", "Knocker"));
    }
}

Pretty simple, isn't it? If you stuck with an older generation, RxJava2, nothing to worry about, Apache CXF has you covered.

<dependency>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-rt-rs-extension-rx2</artifactId>
	<version>3.4.0</version>
</dependency>

The same configuration trick with applying the customizer (which may look annoying at this point to be fair) is all that is required.

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
    @Bean
    public Server rsServer(Bus bus, PeopleRestService service) {
        final JAXRSServerFactoryBean bean = new JAXRSServerFactoryBean();
        bean.getProperties(true).put("useStreamingSubscriber", true);
        bean.setBus(bus);
        bean.setAddress("/");
        bean.setServiceBean(service);
        bean.setProvider(new JacksonJsonProvider());
        new ReactiveIOCustomizer().customize(bean);
        return bean.create();
    }
}

And we are good to go, ready to use the familiar reactive types Observable, Flowable and Single.

import io.reactivex.Flowable;
import io.reactivex.Observable;

@Service
@Path("/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Observable<Person> getPeople() {
        return Flowable
            .just(new Person("[email protected]", "Tom", "Knocker"))
            .toObservable();
    }
}

Last but not least, if you happens to be using the first generation of RxJava, it is also available with Apache CXF but certainly not recommended for production (as it has EOLed a couple of years ago).

Reactive programming paradigm is steadily getting more and more traction. It is great to see that the ecosystem embraces that and frameworks like Apache CXF are not an exception. If you are looking for robust foundation to build reactive and/or asynchronous REST web APIs on JVM, Apache CXF is worth considering, please give it a try!

The complete source code is available on Github.

Sunday, August 26, 2018

Embracing modular Java platform: Apache CXF on Java 10

It's been mostly a year since Java 9 release finally delivered Project Jigsaw to the masses. It was a long, long journey, but it is there, so what has changed? This is a very good question and the answer to it is not obvious and straightforward.

By and large, Project Jigsaw is a disruptive change and there are many reasons why. Although mostly all of our existing application are going to run on Java 10 (to be replaced by JDK 11 very soon) with minimal or no changes, there are deep and profound implications Project Jigsaw brings to the Java developers: embrace the modular applications the Java platform way.

With the myriads of awesome frameworks and libraries out there, it will surely take time, a lot of time, to convert them to Java modules (many will not ever make it). This path is thorny but there are certain things which are already possible even today. In this rather short post we are going to learn how to use terrific Apache CXF project to build JAX-RS 2.1 Web APIs in a truly modular fashion using latest JDK 10.

Since 3.2.5 release, all Apache CXF artifacts have their manifests enriched with an Automatic-Module-Name directive. It does not make them full-fledged modules, but this is a first step in the right direction. So let us get started ...

If you use Apache Maven as the build tool of choice, not much changed here, the dependencies are declared the same way as before.

<dependencies>
    <dependency>
        <groupId>org.apache.cxf</groupId>
        <artifactId>cxf-rt-frontend-jaxrs</artifactId>
        <version>3.2.5</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.jaxrs</groupId>
        <artifactId>jackson-jaxrs-json-provider</artifactId>
        <version>2.9.6</version>
    </dependency>

    <dependency>
        <groupId>org.eclipse.jetty</groupId>
        <artifactId>jetty-server</artifactId>
        <version>9.4.11.v20180605</version>
    </dependency>

    <dependency>
        <groupId>org.eclipse.jetty</groupId>
        <artifactId>jetty-webapp</artifactId>
        <version>9.4.11.v20180605</version>
    </dependency>
</dependencies>

The uber-jar or fat-jar packaging are not really applicable to the modular Java applications so we have to collect the modules ourselves, for example at the target/modules folder.

<plugin>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
        <outputDirectory>${project.build.directory}/modules</outputDirectory>
    </configuration>
</plugin>

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>copy-dependencies</goal>
            </goals>
            <configuration>
                <outputDirectory>${project.build.directory}/modules</outputDirectory>
                <includeScope>runtime</includeScope>
            </configuration>
        </execution>
    </executions>
</plugin>

All good, the next step is to create the module-info.java and list there the name of our module (com.example.cxf in this case) and, among other things, all required modules it needs in order to be functionable.

module com.example.cxf {
    exports com.example.rest;
    
    requires org.apache.cxf.frontend.jaxrs;
    requires org.apache.cxf.transport.http;
    requires com.fasterxml.jackson.jaxrs.json;
    
    requires transitive java.ws.rs;
    
    requires javax.servlet.api;
    requires jetty.server;
    requires jetty.servlet;
    requires jetty.util;
    
    requires java.xml.bind;
}

As you may spot right away, org.apache.cxf.frontend.jaxrs and org.apache.cxf.transport.http come from Apache CXF distribution (the complete list is available in the documentation) whereas java.ws.rs is the JAX-RS 2.1 API module. After that we could proceed with implementing our JAX-RS resources the same way we did before.

@Path("/api/people")
public class PeopleRestService {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Collection<Person> getAll() {
        return List.of(new Person("John", "Smith", "[email protected]"));
    }
}

This looks easy, how about adding some spicy sauce, like server-sent events (SSE) and RxJava, for example? Let us see how exceptionally easy it is, starting from dependencies.

<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-rs-sse</artifactId>
    <version>3.2.5</version>
</dependency>

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.14</version>
</dependency>

Also, we should not forget to update our module-info.java by adding the requires directive to these new modules.

module com.example.cxf {
    ...
    requires org.apache.cxf.rs.sse;
    requires io.reactivex.rxjava2;
    requires transitive org.reactivestreams;
    ...

}

In order to keep things simple, our SSE endpoint would just broadcast every new person added through the API. Here is the implementation snippet which does it.

private SseBroadcaster broadcaster;
private Builder builder;
private PublishSubject<Person> publisher;
    
public PeopleRestService() {
    publisher = PublishSubject.create();
}

@Context 
public void setSse(Sse sse) {
    this.broadcaster = sse.newBroadcaster();
    this.builder = sse.newEventBuilder();
        
    publisher
        .subscribeOn(Schedulers.single())
        .map(person -> createEvent(builder, person))
        .subscribe(broadcaster::broadcast);
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response add(@Context UriInfo uriInfo, Person payload) {
    publisher.onNext(payload);
        
    return Response
        .created(
            uriInfo
                .getRequestUriBuilder()
                .path(payload.getEmail())
                .build())
        .entity(payload)
        .build();
}
    
@GET
@Path("/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void people(@Context SseEventSink sink) {
    broadcaster.register(sink);
}

Now when we build it:

      mvn clean package

And run it using module path:

      java --add-modules java.xml.bind \
           --module-path target/modules \
           --module com.example.cxf/com.example.Starter

We should be able to give our JAX-RS API a test drive. The simplest way to make sure things work as expected is to navigate in the Google Chrome to the SSE endpoint http://localhost:8686/api/people/sse and add some random people through the POST requests, using the old buddy curl from the command line:

      curl -X POST http://localhost:8686/api/people \
           -d '{"email": "[email protected]", "firstName": "John", "lastName": "Smith"}' \
           -H "Content-Type: application/json"
      curl -X POST http://localhost:8686/api/people \
           -d '{"email": "[email protected]", "firstName": "Tom", "lastName": "Tommyknocker"}' \
           -H "Content-Type: application/json"

In the Google Chrome we should be able to see raw SSE events, pushed by the server (they are not looking pretty but good enough to illustrate the flow).

So, what about the application packaging? Docker and containers are certainly a viable option, but with Java 9 and above we have another player: jlink. It assembles and optimizes a set of modules and their dependencies into a custom, fully sufficient runtime image. Let us try it out.

      jlink --add-modules java.xml.bind,java.management \
            --module-path target/modules \
            --verbose \
            --strip-debug \
            --compress 2 \
            --no-header-files \
            --no-man-pages \
            --output target/cxf-java-10-app

Here we are hitting the first wall. Unfortunately, since mostly all the dependencies of our application are automatic modules, it is a problem for jlink and we still have to include module path explicitly when running from the runtime image:

      target/cxf-java-10-app/bin/java  \
           --add-modules java.xml.bind \
           --module-path target/modules \
           --module com.example.cxf/com.example.Starter

At the end of the day it turned out to be not that scary. We are surely in the very early stage of the JPMS adoption, this is just a beginning. When every library, every framework we are using adds the module-info.java to their artifacts (JARs), making them true modules despite all the quirks, then we could declare a victory. But the small wins are already happening, make one yours!

The complete source of the project is available on Github.