Enterprise Java

Project Reactor DataBuffer to Mono Example

In reactive Spring applications, especially those built with Spring WebFlux, data is often handled as streams rather than simple objects. When dealing with request or response bodies at a low level, you commonly encounter DataBuffer instead of a traditional String or POJO. A frequent requirement is to convert a stream of DataBuffer objects into a Mono<String> or Mono<byte[]> for logging, validation, or custom processing. Let us delve into understanding how Java Reactor converts DataBuffer to Mono, focusing on the process of transforming reactive streams of binary data buffers into Mono objects for easier handling in reactive applications.

1. Understanding Reactor and Data Buffers

Project Reactor is the reactive foundation used by Spring WebFlux and provides a non-blocking programming model built around the Reactive Streams specification. At its core, Reactor introduces two main types: Mono, which represents a stream of zero or one element, and Flux, which represents a stream of zero to many elements. These types allow applications to handle asynchronous data flows efficiently while supporting backpressure, which helps prevent consumers from being overwhelmed by fast producers.

In Spring WebFlux, HTTP request and response bodies are not treated as simple objects or strings. Instead, they are exposed as streams of data in the form of Flux<DataBuffer>. A DataBuffer is Spring’s abstraction over raw byte buffers and is designed for high-performance, non-blocking I/O. It typically wraps a Netty ByteBuf or a similar low-level buffer implementation, allowing data to be processed incrementally as it arrives over the network.

Because network data may arrive in multiple chunks, a single request body is often split across several DataBuffer instances. This is why direct access to the complete payload is not immediately available. To work with the full content, these buffers usually need to be combined, read, and then released. Failing to release a DataBuffer after reading it can lead to memory leaks, especially in high-throughput reactive applications.

Understanding how Reactor manages streams and how DataBuffer represents binary data is essential when building custom WebFlux components, such as filters, custom codecs, or low-level request processing logic. A clear grasp of these concepts ensures that reactive pipelines remain efficient, safe, and truly non-blocking.

2. Practical Demonstration

2.1 Maven Dependencies (pom.xml)

The following dependency configuration sets up a minimal Spring Boot WebFlux project with all required reactive components.

<!-- pom.xml -->
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>
</dependencies>

This dependency enables Spring WebFlux and automatically brings in Project Reactor for reactive programming, Netty as the non-blocking web server, and the DataBuffer abstractions required for handling streaming request and response bodies.

2.2 Java Code

The following example shows a complete Spring Boot application with a WebFlux controller that converts a request body from Flux<DataBuffer> into Mono<String>.

package com.example.databuffer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@SpringBootApplication
public class DataBufferApplication {

  public static void main(String[] args) {
    SpringApplication.run(DataBufferApplication.class, args);
  }
}

@RestController
class EchoController {

  @PostMapping("/echo")
  public Mono<String> echo(Flux<DataBuffer> body) {

    return DataBufferUtils.join(body)
        .map(dataBuffer -> {
          byte[] bytes = new byte[dataBuffer.readableByteCount()];
          dataBuffer.read(bytes);
          DataBufferUtils.release(dataBuffer);
          return new String(bytes, StandardCharsets.UTF_8);
        })
        .map(value -> "received payload: " + value);
  }
}

2.2.1 Code Explanation

The application class bootstraps the Spring Boot runtime, while the controller exposes a reactive endpoint that receives the request body as a stream of DataBuffer instances. These buffers are joined into a single buffer, converted into a UTF-8 string, and explicitly released to avoid memory leaks, after which the processed value is returned as a non-blocking Mono<String>.

2.2.2 Code Run and Output

To run the application, start the Spring Boot app and send a POST request to the /echo endpoint with a plain text payload. The server will process the incoming DataBuffer stream, convert it to a string, and respond with the prefixed message.

$ curl -X POST http://localhost:8080/echo -d "hello reactor" -H "Content-Type: text/plain"

received payload: hello reactor

This output confirms that the Flux<DataBuffer> request body was successfully joined, converted to a Mono<String>, and returned by the reactive controller in a non-blocking, memory-safe manner.

3. Conclusion

When working with Spring WebFlux at a lower level, converting a DataBuffer into a Mono is a common need. Request bodies are typically received as Flux<DataBuffer>, so the buffers should be joined before reading their contents, and explicitly released afterward to prevent memory leaks. It is also important to keep the entire process non-blocking and fully reactive. By following this approach, data buffers can be safely and efficiently transformed into reactive types that are simpler to handle within your application.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button