5.5.15
| This documentation is also available as multiple (faster to load) HTML pages and as PDF. |
© 2009 - 2021 VMware, Inc. All rights reserved.
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
Preface
This chapter includes:
1. Requirements
This section details the compatible Java and Spring Framework versions.
2. Code Conventions
Spring Framework 2.0 introduced support for namespaces, which simplifies the XML configuration of the application context and lets Spring Integration provide broad namespace support.
In this reference guide, the int namespace prefix is used for Spring Integration’s core namespace support.
Each Spring Integration adapter type (also called a module) provides its own namespace, which is configured by using the following convention:
The following example shows the int, int-event, and int-stream namespaces in use:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
…
</beans>
For a detailed explanation regarding Spring Integration’s namespace support, see Namespace Support.
| The namespace prefix can be freely chosen. You may even choose not to use any namespace prefixes at all. Therefore, you should apply the convention that best suits your application. Be aware, though, that SpringSource Tool Suiteâ„¢ (STS) uses the same namespace conventions for Spring Integration as used in this reference guide. |
3. Conventions in This Guide
In some cases, to aid formatting when specifying long fully qualified class names, we shorten org.springframework to o.s and org.springframework.integration to o.s.i, such as with o.s.i.transaction.TransactionSynchronizationFactory.
What’s New?
For those who are already familiar with Spring Integration, this chapter provides a brief overview of the new features of version 5.5.
If you are interested in the changes and features that were introduced in earlier versions, see the Change History.
4. What’s New in Spring Integration 5.5?
If you are interested in more details, see the Issue Tracker tickets that were resolved as part of the 5.5 development process.
4.1. New Components
4.1.1. File Aggregator
A FileSplitter.FileMaker-based implementation of CorrelationStrategy, ReleaseStrategy and MessageGroupProcessor as a FileAggregator component was introduced.
See File Aggregator for more information.
4.1.2. MQTT v5 Support
The Mqttv5PahoMessageDrivenChannelAdapter and Mqttv5PahoMessageHandler (including respective MqttHeaderMapper) were introduced to support MQTT v5 protocol communication.
See MQTT v5 Support for more information.
4.2. General Changes
All the persistent MessageGroupStore implementation provide a streamMessagesForGroup(Object groupId) contract based on the target database streaming API.
See Message Store for more information.
The integrationGlobalProperties bean (if declared) must be now an instance of org.springframework.integration.context.IntegrationProperties instead of java.util.Properties, which support is deprecated for backward compatibility.
The spring.integration.channels.error.requireSubscribers=true global property is added to indicate that the global default errorChannel must be configured with the requireSubscribers option (or not).
The spring.integration.channels.error.ignoreFailures=true global property is added to indicate that the global default errorChannel must ignore (or not) dispatching errors and pass the message to the next handler.
See Global Properties for more information.
An AbstractPollingEndpoint (source polling channel adapter and polling consumer) treats maxMessagesPerPoll == 0 as to skip calling the source.
It can be changed to different value later on, e.g. via a Control Bus.
See Polling Consumer for more information.
The ConsumerEndpointFactoryBean now accept a reactiveCustomizer Function to any input channel as reactive stream source and use a ReactiveStreamsConsumer underneath.
This is covered as a ConsumerEndpointSpec.reactive() option in Java DSL and as a @Reactive nested annotation for the messaging annotations.
See Reactive Streams Support for more information.
The groupTimeoutExpression for a correlation message handler (an Aggregator and Resequencer) can now be evaluated to a java.util.Date for some fine-grained scheduling use-cases.
Also the BiFunction groupConditionSupplier option is added to the AbstractCorrelatingMessageHandler to supply a MessageGroup condition against a message to be added to the group.
See Aggregator for more information.
The MessageGroup abstraction can be supplied with a condition to evaluate later on to make a decision for the group.
See Message Group Condition for more information.
4.2.1. Integration Flows Composition
The new IntegrationFlows.from(IntegrationFlow) factory method has been added to allow starting the current IntegrationFlow from the output of an existing flow.
In addition, the IntegrationFlowDefinition has added a to(IntegrationFlow) terminal operator to continue the current flow at the input channel of some other flow.
See Integration Flows Composition for more information.
4.2.2. AMQP Changes
The AmqpInboundChannelAdapter and AmqpInboundGateway (and the respective Java DSL builders) now support an org.springframework.amqp.rabbit.retry.MessageRecoverer as an AMQP-specific alternative to the general purpose RecoveryCallback.
See AMQP Support for more information.
4.2.3. Redis Changes
The ReactiveRedisStreamMessageProducer has now setters for all the StreamReceiver.StreamReceiverOptionsBuilder options, including an onErrorResume function.
See Redis Support for more information.
4.2.4. HTTP Changes
The HttpRequestExecutingMessageHandler doesn’t fallback to the application/x-java-serialized-object content type any more and lets the RestTemplate make the final decision for the request body conversion based on the HttpMessageConverter provided.
It also has now an extractResponseBody flag (which is true by default) to return just the response body, or to return the whole ResponseEntity as the reply message payload, independently of the provided expectedResponseType.
Same option is presented for the WebFluxRequestExecutingMessageHandler, too.
See HTTP Support for more information.
4.2.5. File/FTP/SFTP Changes
The persistent file list filters now have a boolean property forRecursion.
Setting this property to true, also sets alwaysAcceptDirectories, which means that the recursive operation on the outbound gateways (ls and mget) will now always traverse the full directory tree each time.
This is to solve a problem where changes deep in the directory tree were not detected.
In addition, forRecursion=true causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories.
IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory.
For this reason, the property is false by default; this may change in a future release.
The FileInboundChannelAdapterSpec has now a convenient recursive(boolean) option instead of requiring an explicit reference to the RecursiveDirectoryScanner.
The remoteDirectoryExpression can now be used in the mv command for convenience.
4.2.6. MongoDb Changes
The MongoDbMessageSourceSpec was added into MongoDd Java DSL.
An update option is now exposed on both the MongoDbMessageSource and ReactiveMongoDbMessageSource implementations.
See MongoDb Support for more information.
4.2.7. WebSockets Changes
The WebSocket channel adapters based on ServerWebSocketContainer can now be registered and removed at runtime.
See WebSockets Support for more information.
4.2.8. JPA Changes
The JpaOutboundGateway now supports an Iterable message payload for a PersistMode.DELETE.
See Outbound Channel Adapter for more information.
4.2.9. Gateway Changes
Previously, when using XML configuration, @Gateway.payloadExpression was ignored for no-argument methods.
There is one possible breaking change - if the method is annotated with @Payload as well as @Gateway (with a different expression) previously, the @Payload would be applied, now the @Gateway.payloadExpression is applied.
See Gateway Configuration with Annotations and XML and Invoking No-Argument Methods for more information.
Overview of Spring Integration Framework
Spring Integration provides an extension of the Spring programming model to support the well known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems through declarative adapters. Those adapters provide a higher level of abstraction over Spring’s support for remoting, messaging, and scheduling.
Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.
5. Spring Integration Overview
This chapter provides a high-level introduction to Spring Integration’s core concepts and components. It includes some programming tips to help you make the most of Spring Integration.
5.1. Background
One of the key themes of the Spring Framework is Inversion of Control (IoC). In its broadest sense, this means that the framework handles responsibilities on behalf of the components that are managed within its context. The components themselves are simplified, because they are relieved of those responsibilities. For example, dependency injection relieves the components of the responsibility of locating or creating their dependencies. Likewise, aspect-oriented programming relieves business components of generic cross-cutting concerns by modularizing them into reusable aspects. In each case, the end result is a system that is easier to test, understand, maintain, and extend.
Furthermore, the Spring framework and portfolio provide a comprehensive programming model for building enterprise applications. Developers benefit from the consistency of this model and especially from the fact that it is based upon well established best practices, such as programming to interfaces and favoring composition over inheritance. Spring’s simplified abstractions and powerful support libraries boost developer productivity while simultaneously increasing the level of testability and portability.
Spring Integration is motivated by these same goals and principles. It extends the Spring programming model into the messaging domain and builds upon Spring’s existing enterprise integration support to provide an even higher level of abstraction. It supports message-driven architectures where inversion of control applies to runtime concerns, such as when certain business logic should run and where the response should be sent. It supports routing and transformation of messages so that different transports and different data formats can be integrated without impacting testability. In other words, the messaging and integration concerns are handled by the framework. Business components are further isolated from the infrastructure, and developers are relieved of complex integration responsibilities.
As an extension of the Spring programming model, Spring Integration provides a wide variety of configuration options, including annotations, XML with namespace support, XML with generic “bean” elements, and direct usage of the underlying API. That API is based upon well defined strategy interfaces and non-invasive, delegating adapters. Spring Integration’s design is inspired by the recognition of a strong affinity between common patterns within Spring and the well known patterns described in Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf (Addison Wesley, 2004). Developers who have read that book should be immediately comfortable with the Spring Integration concepts and terminology.
5.2. Goals and Principles
Spring Integration is motivated by the following goals:
-
Provide a simple model for implementing complex enterprise integration solutions.
-
Facilitate asynchronous, message-driven behavior within a Spring-based application.
-
Promote intuitive, incremental adoption for existing Spring users.
Spring Integration is guided by the following principles:
-
Components should be loosely coupled for modularity and testability.
-
The framework should enforce separation of concerns between business logic and integration logic.
-
Extension points should be abstract in nature (but within well-defined boundaries) to promote reuse and portability.
5.3. Main Components
From a vertical perspective, a layered architecture facilitates separation of concerns, and interface-based contracts between layers promote loose coupling. Spring-based applications are typically designed this way, and the Spring framework and portfolio provide a strong foundation for following this best practice for the full stack of an enterprise application. Message-driven architectures add a horizontal perspective, yet these same goals are still relevant. Just as “layered architecture” is an extremely generic and abstract paradigm, messaging systems typically follow the similarly abstract “pipes-and-filters” model. The “filters” represent any components capable of producing or consuming messages, and the “pipes” transport the messages between filters so that the components themselves remain loosely-coupled. It is important to note that these two high-level paradigms are not mutually exclusive. The underlying messaging infrastructure that supports the “pipes” should still be encapsulated in a layer whose contracts are defined as interfaces. Likewise, the “filters” themselves should be managed within a layer that is logically above the application’s service layer, interacting with those services through interfaces in much the same way that a web tier would.
5.3.1. Message
In Spring Integration, a message is a generic wrapper for any Java object combined with metadata used by the framework while handling that object. It consists of a payload and headers. The payload can be of any type, and the headers hold commonly required information such as ID, timestamp, correlation ID, and return address. Headers are also used for passing values to and from connected transports. For example, when creating a message from a received file, the file name may be stored in a header to be accessed by downstream components. Likewise, if a message’s content is ultimately going to be sent by an outbound mail adapter, the various properties (to, from, cc, subject, and others) may be configured as message header values by an upstream component. Developers can also store any arbitrary key-value pairs in the headers.
5.3.2. Message Channel
A message channel represents the “pipe” of a pipes-and-filters architecture. Producers send messages to a channel, and consumers receive messages from a channel. The message channel therefore decouples the messaging components and also provides a convenient point for interception and monitoring of messages.
A message channel may follow either point-to-point or publish-subscribe semantics. With a point-to-point channel, no more than one consumer can receive each message sent to the channel. Publish-subscribe channels, on the other hand, attempt to broadcast each message to all subscribers on the channel. Spring Integration supports both of these models.
Whereas “point-to-point” and "publish-subscribe" define the two options for how many consumers ultimately receive each message, there is another important consideration: Should the channel buffer messages? In Spring Integration, pollable channels are capable of buffering Messages within a queue. The advantage of buffering is that it allows for throttling the inbound messages and thereby prevents overloading a consumer. However, as the name suggests, this also adds some complexity, since a consumer can only receive the messages from such a channel if a poller is configured. On the other hand, a consumer connected to a subscribable channel is simply message-driven. Message Channel Implementations has a detailed discussion of the variety of channel implementations available in Spring Integration.
5.3.3. Message Endpoint
One of the primary goals of Spring Integration is to simplify the development of enterprise integration solutions through inversion of control. This means that you should not have to implement consumers and producers directly, and you should not even have to build messages and invoke send or receive operations on a message channel. Instead, you should be able to focus on your specific domain model with an implementation based on plain objects. Then, by providing declarative configuration, you can “connect” your domain-specific code to the messaging infrastructure provided by Spring Integration. The components responsible for these connections are message endpoints. This does not mean that you should necessarily connect your existing application code directly. Any real-world enterprise integration solution requires some amount of code focused upon integration concerns such as routing and transformation. The important thing is to achieve separation of concerns between the integration logic and the business logic. In other words, as with the Model-View-Controller (MVC) paradigm for web applications, the goal should be to provide a thin but dedicated layer that translates inbound requests into service layer invocations and then translates service layer return values into outbound replies. The next section provides an overview of the message endpoint types that handle these responsibilities, and, in upcoming chapters, you can see how Spring Integration’s declarative configuration options provide a non-invasive way to use each of these.
5.4. Message Endpoints
A Message Endpoint represents the “filter” of a pipes-and-filters architecture. As mentioned earlier, the endpoint’s primary role is to connect application code to the messaging framework and to do so in a non-invasive manner. In other words, the application code should ideally have no awareness of the message objects or the message channels. This is similar to the role of a controller in the MVC paradigm. Just as a controller handles HTTP requests, the message endpoint handles messages. Just as controllers are mapped to URL patterns, message endpoints are mapped to message channels. The goal is the same in both cases: isolate application code from the infrastructure. These concepts and all of the patterns that follow are discussed at length in the Enterprise Integration Patterns book. Here, we provide only a high-level description of the main endpoint types supported by Spring Integration and the roles associated with those types. The chapters that follow elaborate and provide sample code as well as configuration examples.
5.4.1. Message Transformer
A message transformer is responsible for converting a message’s content or structure and returning the modified message.
Probably the most common type of transformer is one that converts the payload of the message from one format to another (such as from XML to java.lang.String).
Similarly, a transformer can add, remove, or modify the message’s header values.
5.4.2. Message Filter
A message filter determines whether a message should be passed to an output channel at all.
This simply requires a boolean test method that may check for a particular payload content type, a property value, the presence of a header, or other conditions.
If the message is accepted, it is sent to the output channel.
If not, it is dropped (or, for a more severe implementation, an Exception could be thrown).
Message filters are often used in conjunction with a publish-subscribe channel, where multiple consumers may receive the same message and use the criteria of the filter to narrow down the set of messages to be processed.
| Be careful not to confuse the generic use of “filter” within the pipes-and-filters architectural pattern with this specific endpoint type that selectively narrows down the messages flowing between two channels. The pipes-and-filters concept of a “filter” matches more closely with Spring Integration’s message endpoint: any component that can be connected to a message channel in order to send or receive messages. |
5.4.3. Message Router
A message router is responsible for deciding what channel or channels (if any) should receive the message next. Typically, the decision is based upon the message’s content or the metadata available in the message headers. A message router is often used as a dynamic alternative to a statically configured output channel on a service activator or other endpoint capable of sending reply messages. Likewise, a message router provides a proactive alternative to the reactive message filters used by multiple subscribers, as described earlier.
5.4.4. Splitter
A splitter is another type of message endpoint whose responsibility is to accept a message from its input channel, split that message into multiple messages, and send each of those to its output channel. This is typically used for dividing a “composite” payload object into a group of messages containing the subdivided payloads.
5.4.5. Aggregator
Basically a mirror-image of the splitter, the aggregator is a type of message endpoint that receives multiple messages and combines them into a single message.
In fact, aggregators are often downstream consumers in a pipeline that includes a splitter.
Technically, the aggregator is more complex than a splitter, because it is required to maintain state (the messages to be aggregated), to decide when the complete group of messages is available, and to timeout if necessary.
Furthermore, in case of a timeout, the aggregator needs to know whether to send the partial results, discard them, or send them to a separate channel.
Spring Integration provides a CorrelationStrategy, a ReleaseStrategy, and configurable settings for timeout, whether
to send partial results upon timeout, and a discard channel.
5.4.6. Service Activator
A Service Activator is a generic endpoint for connecting a service instance to the messaging system. The input message channel must be configured, and, if the service method to be invoked is capable of returning a value, an output message Channel may also be provided.
| The output channel is optional, since each message may also provide its own 'Return Address' header. This same rule applies for all consumer endpoints. |
The service activator invokes an operation on some service object to process the request message, extracting the request message’s payload and converting (if the method does not expect a message-typed parameter). Whenever the service object’s method returns a value, that return value is likewise converted to a reply message if necessary (if it is not already a message type). That reply message is sent to the output channel. If no output channel has been configured, the reply is sent to the channel specified in the message’s “return address”, if available.
A request-reply service activator endpoint connects a target object’s method to input and output Message Channels.
| As discussed earlier, in Message Channel, channels can be pollable or subscribable. In the preceding diagram, this is depicted by the “clock” symbol and the solid arrow (poll) and the dotted arrow (subscribe). |
5.4.7. Channel Adapter
A channel adapter is an endpoint that connects a message channel to some other system or transport. Channel adapters may be either inbound or outbound. Typically, the channel adapter does some mapping between the message and whatever object or resource is received from or sent to the other system (file, HTTP Request, JMS message, and others). Depending on the transport, the channel adapter may also populate or extract message header values. Spring Integration provides a number of channel adapters, which are described in upcoming chapters.
MessageChannel.| Message sources can be pollable (for example, POP3) or message-driven (for example, IMAP Idle). In the preceding diagram, this is depicted by the “clock” symbol and the solid arrow (poll) and the dotted arrow (message-driven). |
MessageChannel to a target system.| As discussed earlier in Message Channel, channels can be pollable or subscribable. In the preceding diagram, this is depicted by the “clock” symbol and the solid arrow (poll) and the dotted arrow (subscribe). |
5.4.8. Endpoint Bean Names
Consuming endpoints (anything with an inputChannel) consist of two beans, the consumer and the message handler.
The consumer has a reference to the message handler and invokes it as messages arrive.
Consider the following XML example:
<int:service-activator id = "someService" ... />
Given the preceding example, the bean names are as follows:
-
Consumer:
someService(theid) -
Handler:
someService.handler
When using Enterprise Integration Pattern (EIP) annotations, the names depend on several factors. Consider the following example of an annotated POJO:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
Given the preceding example, the bean names are as follows:
-
Consumer:
someComponent.someMethod.serviceActivator -
Handler:
someComponent.someMethod.serviceActivator.handler
Starting with version 5.0.4, you can modify these names by using the @EndpointId annotation, as the following example shows:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
Given the preceding example, the bean names are as follows:
-
Consumer:
someService -
Handler:
someService.handler
The @EndpointId creates names as created by the id attribute with XML configuration.
Consider the following example of an annotated bean:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
Given the preceding example, the bean names are as follows:
-
Consumer:
someConfiguration.someHandler.serviceActivator -
Handler:
someHandler(the@Beanname)
Starting with version 5.0.4, you can modify these names by using the @EndpointId annotation, as the following example shows:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") (1)
@EndpointId("someService") (2)
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
| 1 | Handler: someService.handler (the bean name) |
| 2 | Consumer: someService (the endpoint ID) |
The @EndpointId annotation creates names as created by the id attribute with XML configuration, as long as you use the convention of appending .handler to the @Bean name.
There is one special case where a third bean is created: For architectural reasons, if a MessageHandler @Bean does not define an AbstractReplyProducingMessageHandler, the framework wraps the provided bean in a ReplyProducingMessageHandlerWrapper.
This wrapper supports request handler advice handling and emits the normal 'produced no reply' debug log messages.
Its bean name is the handler bean name plus .wrapper (when there is an @EndpointId — otherwise, it is the normal generated handler name).
Similarly Pollable Message Sources create two beans, a SourcePollingChannelAdapter (SPCA) and a MessageSource.
Consider the following XML configuration:
<int:inbound-channel-adapter id = "someAdapter" ... />
Given the preceding XML configuration, the bean names are as follows:
-
SPCA:
someAdapter(theid) -
Handler:
someAdapter.source
Consider the following Java configuration of a POJO to define an @EndpointId:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
Given the preceding Java configuration example, the bean names are as follows:
-
SPCA:
someAdapter -
Handler:
someAdapter.source
Consider the following Java configuration of a bean to define an @EndpointID:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
Given the preceding example, the bean names are as follows:
-
SPCA:
someAdapter -
Handler:
someAdapter.source(as long as you use the convention of appending.sourceto the@Beanname)
5.5. Configuration and @EnableIntegration
Throughout this document, you can see references to XML namespace support for declaring elements in a Spring Integration flow.
This support is provided by a series of namespace parsers that generate appropriate bean definitions to implement a particular component.
For example, many endpoints consist of a MessageHandler bean and a ConsumerEndpointFactoryBean into which the handler and an input channel name are injected.
The first time a Spring Integration namespace element is encountered, the framework automatically declares a number of beans (a task scheduler, an implicit channel creator, and others) that are used to support the runtime environment.
Version 4.0 introduced the @EnableIntegration annotation, to allow the registration of Spring Integration infrastructure beans (see the Javadoc).
This annotation is required when only Java configuration is used — for example with Spring Boot or Spring Integration Messaging Annotation support and Spring Integration Java DSL with no XML integration configuration.
|
The @EnableIntegration annotation is also useful when you have a parent context with no Spring Integration components and two or more child contexts that use Spring Integration.
It lets these common components be declared once only, in the parent context.
The @EnableIntegration annotation registers many infrastructure components with the application context.
In particular, it:
-
Registers some built-in beans, such as
errorChanneland itsLoggingHandler,taskSchedulerfor pollers,jsonPathSpEL-function, and others. -
Adds several
BeanFactoryPostProcessorinstances to enhance theBeanFactoryfor global and default integration environment. -
Adds several
BeanPostProcessorinstances to enhance or convert and wrap particular beans for integration purposes. -
Adds annotation processors to parse messaging annotations and registers components for them with the application context.
The @IntegrationComponentScan annotation also permits classpath scanning.
This annotation plays a similar role as the standard Spring Framework @ComponentScan annotation, but it is restricted to components and annotations that are specific to Spring Integration, which the standard Spring Framework component scan mechanism cannot reach.
For an example, see @MessagingGateway Annotation.
The @EnablePublisher annotation registers a PublisherAnnotationBeanPostProcessor bean and configures the default-publisher-channel for those @Publisher annotations that are provided without a channel attribute.
If more than one @EnablePublisher annotation is found, they must all have the same value for the default channel.
See Annotation-driven Configuration with the @Publisher Annotation for more information.
The @GlobalChannelInterceptor annotation has been introduced to mark ChannelInterceptor beans for global channel interception.
This annotation is an analogue of the <int:channel-interceptor> XML element (see Global Channel Interceptor Configuration).
@GlobalChannelInterceptor annotations can be placed at the class level (with a @Component stereotype annotation) or on @Bean methods within @Configuration classes.
In either case, the bean must implement ChannelInterceptor.
Starting with version 5.1, global channel interceptors apply to dynamically registered channels — such as beans that are initialized by using beanFactory.initializeBean() or through the IntegrationFlowContext when using the Java DSL.
Previously, interceptors were not applied when beans were created after the application context was refreshed.
The @IntegrationConverter annotation marks Converter, GenericConverter, or ConverterFactory beans as candidate converters for integrationConversionService.
This annotation is an analogue of the <int:converter> XML element (see Payload Type Conversion).
You can place @IntegrationConverter annotations at the class level (with a @Component stereotype annotation) or on @Bean methods within @Configuration classes.
See Annotation Support for more information about messaging annotations.
5.6. Programming Considerations
You should use plain old java objects (POJOs) whenever possible and only expose the framework in your code when absolutely necessary. See POJO Method invocation for more information.
If you do expose the framework to your classes, there are some considerations that need to be taken into account, especially during application startup:
-
If your component is
ApplicationContextAware, you should generally not use theApplicationContextin thesetApplicationContext()method. Instead, store a reference and defer such uses until later in the context lifecycle. -
If your component is an
InitializingBeanor uses@PostConstructmethods, do not send any messages from these initialization methods. The application context is not yet initialized when these methods are called, and sending such messages is likely to fail. If you need to send a messages during startup, implementApplicationListenerand wait for theContextRefreshedEvent. Alternatively, implementSmartLifecycle, put your bean in a late phase, and send the messages from thestart()method.
5.6.1. Considerations When Using Packaged (for example, Shaded) Jars
Spring Integration bootstraps certain features by using Spring Framework’s SpringFactories mechanism to load several IntegrationConfigurationInitializer classes.
This includes the -core jar as well as certain others, including -http and -jmx.
The information for this process is stored in a META-INF/spring.factories file in each jar.
Some developers prefer to repackage their application and all dependencies into a single jar by using well known tools, such as the Apache Maven Shade Plugin.
By default, the shade plugin does not merge the spring.factories files when producing the shaded jar.
In addition to spring.factories, other META-INF files (spring.handlers and spring.schemas) are used for XML configuration.
These files also need to be merged.
Spring Boot’s executable jar mechanism takes a different approach, in that it nests the jars, thus retaining each spring.factories file on the class path.
So, with a Spring Boot application, nothing more is needed if you use its default executable jar format.
|
Even if you do not use Spring Boot, you can still use the tooling provided by Boot to enhance the shade plugin by adding transformers for the above mentioned files. The following example shows how to configure the plugin:
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> (1)
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> (2)
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
Specifically,
| 1 | Add the spring-boot-maven-plugin as a dependency. |
| 2 | Configure the transformers. |
You can add a property for ${spring.boot.version} or use an explicit version.
5.7. Programming Tips and Tricks
This section documents some of the ways to get the most from Spring Integration.
5.7.1. XML Schemas
When using XML configuration, to avoid getting false schema validation errors, you should use a “Spring-aware” IDE, such as the Spring Tool Suite (STS), Eclipse with the Spring IDE plugins, or IntelliJ IDEA.
These IDEs know how to resolve the correct XML schema from the classpath (by using the META-INF/spring.schemas file in the jars).
When using STS or Eclipse with the plugin, you must enable Spring Project Nature on the project.
The schemas hosted on the internet for certain legacy modules (those that existed in version 1.0) are the 1.0 versions for compatibility reasons. If your IDE uses these schemas, you are likely to see false errors.
Each of these online schemas has a warning similar to the following:
|
This schema is for the 1.0 version of Spring Integration Core. We cannot update it to the current schema because that will break any applications using 1.0.3 or lower. For subsequent versions, the unversioned schema is resolved from the classpath and obtained from the jar. Please refer to github: |
The affected modules are
-
core(spring-integration.xsd) -
file -
http -
jms -
mail -
rmi -
security -
stream -
ws -
xml
5.7.2. Finding Class Names for Java and DSL Configuration
With XML configuration and Spring Integration Namespace support, the XML parsers hide how target beans are declared and wired together. For Java configuration, it is important to understand the Framework API for target end-user applications.
The first-class citizens for EIP implementation are Message, Channel, and Endpoint (see Main Components, earlier in this chapter).
Their implementations (contracts) are:
-
org.springframework.messaging.Message: See Message; -
org.springframework.messaging.MessageChannel: See Message Channels; -
org.springframework.integration.endpoint.AbstractEndpoint: See Poller.
The first two are simple enough to understand how to implement, configure, and use. The last one deserves more attention
The AbstractEndpoint is widely used throughout the Spring Framework for different component implementations.
Its main implementations are:
-
EventDrivenConsumer, used when we subscribe to aSubscribableChannelto listen for messages. -
PollingConsumer, used when we poll for messages from aPollableChannel.
When you use messaging annotations or the Java DSL, you don’t need to worry about these components, because the Framework automatically produces them with appropriate annotations and BeanPostProcessor implementations.
When building components manually, you should use the ConsumerEndpointFactoryBean to help determine the target AbstractEndpoint consumer implementation to create, based on the provided inputChannel property.
On the other hand, the ConsumerEndpointFactoryBean delegates to an another first class citizen in the Framework - org.springframework.messaging.MessageHandler.
The goal of the implementation of this interface is to handle the message consumed by the endpoint from the channel.
All EIP components in Spring Integration are MessageHandler implementations (for example, AggregatingMessageHandler, MessageTransformingHandler, AbstractMessageSplitter, and others).
The target protocol outbound adapters (FileWritingMessageHandler, HttpRequestExecutingMessageHandler, AbstractMqttMessageHandler, and others) are also MessageHandler implementations.
When you develop Spring Integration applications with Java configuration, you should look into the Spring Integration module to find an appropriate MessageHandler implementation to use for the @ServiceActivator configuration.
For example, to send an XMPP message (see XMPP Support) you should configure something like the following:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
The MessageHandler implementations represent the outbound and processing part of the message flow.
The inbound message flow side has its own components, which are divided into polling and listening behaviors.
The listening (message-driven) components are simple and typically require only one target class implementation to be ready to
produce messages.
Listening components can be one-way MessageProducerSupport implementations, (such as AbstractMqttMessageDrivenChannelAdapter and ImapIdleChannelAdapter) or request-reply MessagingGatewaySupport implementations (such as AmqpInboundGateway and AbstractWebServiceInboundGateway).
Polling inbound endpoints are for those protocols that do not provide a listener API or are not intended for such a behavior, including any file based protocol (such as FTP), any data bases (RDBMS or NoSQL), and others.
These inbound endpoints consist of two components: the poller configuration, to initiate the polling task periodically,
and a message source class to read data from the target protocol and produce a message for the downstream integration flow.
The first class for the poller configuration is a SourcePollingChannelAdapter.
It is one more AbstractEndpoint implementation, but especially for polling to initiate an integration flow.
Typically, with the messaging annotations or Java DSL, you should not worry about this class.
The Framework produces a bean for it, based on the @InboundChannelAdapter configuration or a Java DSL builder spec.
Message source components are more important for the target application development, and they all implement the MessageSource interface (for example, MongoDbMessageSource and AbstractTwitterMessageSource).
With that in mind, our config for reading data from an RDBMS table with JDBC could resemble the following:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
You can find all the required inbound and outbound classes for the target protocols in the particular Spring Integration module (in most cases, in the respective package).
For example, the spring-integration-websocket adapters are:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter: ImplementsMessageProducerSupportto listen for frames on the socket and produce message to the channel. -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler: The one-wayAbstractMessageHandlerimplementation to convert incoming messages to the appropriate frame and send over websocket.
If you are familiar with Spring Integration XML configuration, starting with version 4.3, we provide information in the XSD element definitions about which target classes are used to declare beans for the adapter or gateway, as the following example shows:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
5.8. POJO Method invocation
As discussed in Programming Considerations, we recommend using a POJO programming style, as the following example shows:
@ServiceActivator
public String myService(String payload) { ... }
In this case, the framework extracts a String payload, invokes your method, and wraps the result in a message to send to the next component in the flow (the original headers are copied to the new message).
In fact, if you use XML configuration, you do not even need the @ServiceActivator annotation, as the following paired examples show:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
You can omit the method attribute as long as there is no ambiguity in the public methods on the class.
You can also obtain header information in your POJO methods, as the following example shows:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
You can also dereference properties on the message, as the following example shows:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
Because various POJO method invocations are available, versions prior to 5.0 used SpEL (Spring Expression Language) to invoke the POJO methods.
SpEL (even interpreted) is usually “fast enough” for these operations, when compared to the actual work usually done in the methods.
However, starting with version 5.0, the org.springframework.messaging.handler.invocation.InvocableHandlerMethod is used by default whenever possible.
This technique is usually faster to execute than interpreted SpEL and is consistent with other Spring messaging projects.
The InvocableHandlerMethod is similar to the technique used to invoke controller methods in Spring MVC.
There are certain methods that are still always invoked when using SpEL.
Examples include annotated parameters with dereferenced properties, as discussed earlier.
This is because SpEL has the capability to navigate a property path.
There may be some other corner cases that we have not considered that also do not work with InvocableHandlerMethod instances.
For this reason, we automatically fall back to using SpEL in those cases.
If you wish, you can also set up your POJO method such that it always uses SpEL, with the UseSpelInvoker annotation, as the following example shows:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
If the compilerMode property is omitted, the spring.expression.compiler.mode system property determines the compiler mode.
See SpEL compilation for more information about compiled SpEL.
Core Messaging
This section covers all aspects of the core messaging API in Spring Integration. It covers messages, message channels, and message endpoints. It also covers many of the enterprise integration patterns, such as filter, router, transformer, service activator , splitter, and aggregator.
This section also contains material about system management, including the control bus and message history support.
6. Messaging Channels
6.1. Message Channels
While the Message plays the crucial role of encapsulating data, it is the MessageChannel that decouples message producers from message consumers.
6.1.1. The MessageChannel Interface
Spring Integration’s top-level MessageChannel interface is defined as follows:
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
When sending a message, the return value is true if the message is sent successfully.
If the send call times out or is interrupted, it returns false.
PollableChannel
Since message channels may or may not buffer messages (as discussed in the Spring Integration Overview), two sub-interfaces define the buffering (pollable) and non-buffering (subscribable) channel behavior.
The following listing shows the definition of the PollableChannel interface:
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
As with the send methods, when receiving a message, the return value is null in the case of a timeout or interrupt.
SubscribableChannel
The SubscribableChannel base interface is implemented by channels that send messages directly to their subscribed MessageHandler instances.
Therefore, they do not provide receive methods for polling.
Instead, they define methods for managing those subscribers.
The following listing shows the definition of the SubscribableChannel interface:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
6.1.2. Message Channel Implementations
Spring Integration provides several different message channel implementations. The following sections briefly describe each one.
PublishSubscribeChannel
The PublishSubscribeChannel implementation broadcasts any Message sent to it to all of its subscribed handlers.
This is most often used for sending event messages, whose primary role is notification (as opposed to document messages, which are generally intended to be processed by a single handler).
Note that the PublishSubscribeChannel is intended for sending only.
Since it broadcasts to its subscribers directly when its send(Message) method is invoked, consumers cannot poll for messages (it does not implement PollableChannel and therefore has no receive() method).
Instead, any subscriber must itself be a MessageHandler, and the subscriber’s handleMessage(Message) method is invoked in turn.
Prior to version 3.0, invoking the send method on a PublishSubscribeChannel that had no subscribers returned false.
When used in conjunction with a MessagingTemplate, a MessageDeliveryException was thrown.
Starting with version 3.0, the behavior has changed such that a send is always considered successful if at least the minimum subscribers are present (and successfully handle the message).
This behavior can be modified by setting the minSubscribers property, which defaults to 0.
If you use a TaskExecutor, only the presence of the correct number of subscribers is used for this determination, because the actual handling of the message is performed asynchronously.
|
QueueChannel
The QueueChannel implementation wraps a queue.
Unlike the PublishSubscribeChannel, the QueueChannel has point-to-point semantics.
In other words, even if the channel has multiple consumers, only one of them should receive any Message sent to that channel.
It provides a default no-argument constructor (providing an essentially unbounded capacity of Integer.MAX_VALUE) as well as a constructor that accepts the queue capacity, as the following listing shows:
public QueueChannel(int capacity)
A channel that has not reached its capacity limit stores messages in its internal queue, and the send(Message<?>) method returns immediately, even if no receiver is ready to handle the message.
If the queue has reached capacity, the sender blocks until room is available in the queue.
Alternatively, if you use the send method that has an additional timeout parameter, the queue blocks until either room is available or the timeout period elapses, whichever occurs first.
Similarly, a receive() call returns immediately if a message is available on the queue, but, if the queue is empty, then a receive call may block until either a message is available or the timeout, if provided, elapses.
In either case, it is possible to force an immediate return regardless of the queue’s state by passing a timeout value of 0.
Note, however, that calls to the versions of send() and receive() with no timeout parameter block indefinitely.
PriorityChannel
Whereas the QueueChannel enforces first-in-first-out (FIFO) ordering, the PriorityChannel is an alternative implementation that allows for messages to be ordered within the channel based upon a priority.
By default, the priority is determined by the priority header within each message.
However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel constructor.
RendezvousChannel
The RendezvousChannel enables a “direct-handoff” scenario, wherein a sender blocks until another party invokes the channel’s receive() method.
The other party blocks until the sender sends the message.
Internally, this implementation is quite similar to the QueueChannel, except that it uses a SynchronousQueue (a zero-capacity implementation of BlockingQueue).
This works well in situations where the sender and receiver operate in different threads, but asynchronously dropping the message in a queue is not appropriate.
In other words, with a RendezvousChannel, the sender knows that some receiver has accepted the message, whereas with a QueueChannel, the message would have been stored to the internal queue and potentially never received.
Keep in mind that all of these queue-based channels are storing messages in-memory only by default.
When persistence is required, you can either provide a 'message-store' attribute within the 'queue' element to reference a persistent MessageStore implementation or you can replace the local channel with one that is backed by a persistent broker, such as a JMS-backed channel or channel adapter.
The latter option lets you take advantage of any JMS provider’s implementation for message persistence, as discussed in JMS Support.
However, when buffering in a queue is not necessary, the simplest approach is to rely upon the DirectChannel, discussed in the next section.
|
The RendezvousChannel is also useful for implementing request-reply operations.
The sender can create a temporary, anonymous instance of RendezvousChannel, which it then sets as the 'replyChannel' header when building a Message.
After sending that Message, the sender can immediately call receive (optionally providing a timeout value) in order to block while waiting for a reply Message.
This is very similar to the implementation used internally by many of Spring Integration’s request-reply components.
DirectChannel
The DirectChannel has point-to-point semantics but otherwise is more similar to the PublishSubscribeChannel than any of the queue-based channel implementations described earlier.
It implements the SubscribableChannel interface instead of the PollableChannel interface, so it dispatches messages directly to a subscriber.
As a point-to-point channel, however, it differs from the PublishSubscribeChannel in that it sends each Message to a single subscribed MessageHandler.
In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on “both sides” of the channel.
For example, if a handler subscribes to a DirectChannel, then sending a Message to that channel triggers invocation of that handler’s handleMessage(Message) method directly in the sender’s thread, before the send() method invocation can return.
The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send call is invoked within the scope of a transaction, the outcome of the handler’s invocation (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback).
Since the DirectChannel is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration.
The general idea is to define the channels for an application, consider which of those need to provide buffering or to throttle input, and modify those to be queue-based PollableChannels.
Likewise, if a channel needs to broadcast messages, it should not be a DirectChannel but rather a PublishSubscribeChannel.
Later, we show how each of these channels can be configured.
|
The DirectChannel internally delegates to a message dispatcher to invoke its subscribed message handlers, and that dispatcher can have a load-balancing strategy exposed by load-balancer or load-balancer-ref attributes (mutually exclusive).
The load balancing strategy is used by the message dispatcher to help determine how messages are distributed amongst message handlers when multiple message handlers subscribe to the same channel.
As a convenience, the load-balancer attribute exposes an enumeration of values pointing to pre-existing implementations of LoadBalancingStrategy.
A round-robin (load-balances across the handlers in rotation) and none (for the cases where one wants to explicitly disable load balancing) are the only available values.
Other strategy implementations may be added in future versions.
However, since version 3.0, you can provide your own implementation of the LoadBalancingStrategy and inject it by using the load-balancer-ref attribute, which should point to a bean that implements LoadBalancingStrategy, as the following example shows:
A FixedSubscriberChannel is a SubscribableChannel that only supports a single MessageHandler subscriber that cannot be unsubscribed.
This is useful for high-throughput performance use-cases when no other subscribers are involved and no channel interceptors are needed.
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
Note that the load-balancer and load-balancer-ref attributes are mutually exclusive.
The load-balancing also works in conjunction with a boolean failover property.
If the failover value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.
If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided.
In other words, the dispatcher still supports the failover boolean property even when no load-balancing is enabled.
Without load-balancing, however, the invocation of handlers always begins with the first, according to their order.
For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on.
When using the namespace support, the order attribute on any endpoint determines the order.
Keep in mind that load-balancing and failover apply only when a channel has more than one subscribed message handler.
When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the input-channel attribute.
|
Starting with version 5.2, when failover is true, a failure of the current handler together with the failed message is logged under debug or info if configured respectively.
ExecutorChannel
The ExecutorChannel is a point-to-point channel that supports the same dispatcher configuration as DirectChannel (load-balancing strategy and the failover boolean property).
The key difference between these two dispatching channel types is that the ExecutorChannel delegates to an instance of TaskExecutor to perform the dispatch.
This means that the send method typically does not block, but it also means that the handler invocation may not occur in the sender’s thread.
It therefore does not support transactions that span the sender and receiving handler.
The sender can sometimes block.
For example, when using a TaskExecutor with a rejection policy that throttles the client (such as the ThreadPoolExecutor.CallerRunsPolicy), the sender’s thread can execute the method any time the thread pool is at its maximum capacity and the executor’s work queue is full.
Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.
|
FluxMessageChannel
The FluxMessageChannel is an org.reactivestreams.Publisher implementation for "sinking" sent messages into an internal reactor.core.publisher.Flux for on demand consumption by reactive subscribers downstream.
This channel implementation is neither a SubscribableChannel, nor a PollableChannel, so only org.reactivestreams.Subscriber instances can be used to consume from this channel honoring back-pressure nature of reactive streams.
On the other hand, the FluxMessageChannel implements a ReactiveStreamsSubscribableChannel with its subscribeTo(Publisher<Message<?>>) contract allowing receiving events from reactive source publishers, bridging a reactive stream into the integration flow.
To achieve fully reactive behavior for the whole integration flow, such a channel must be placed between all the endpoints in the flow.
See Reactive Streams Support for more information about interaction with Reactive Streams.
Scoped Channel
Spring Integration 1.0 provided a ThreadLocalChannel implementation, but that has been removed as of 2.0.
Now the more general way to handle the same requirement is to add a scope attribute to a channel.
The value of the attribute can be the name of a scope that is available within the context.
For example, in a web environment, certain scopes are available, and any custom scope implementations can be registered with the context.
The following example shows a thread-local scope being applied to a channel, including the registration of the scope itself:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
The channel defined in the previous example also delegates to a queue internally, but the channel is bound to the current thread, so the contents of the queue are similarly bound.
That way, the thread that sends to the channel can later receive those same messages, but no other thread would be able to access them.
While thread-scoped channels are rarely needed, they can be useful in situations where DirectChannel instances are being used to enforce a single thread of operation but any reply messages should be sent to a “terminal” channel.
If that terminal channel is thread-scoped, the original sending thread can collect its replies from the terminal channel.
Now, since any channel can be scoped, you can define your own scopes in addition to thread-Local.
6.1.3. Channel Interceptors
One of the advantages of a messaging architecture is the ability to provide common behavior and capture meaningful information about the messages passing through the system in a non-invasive way.
Since the Message instances are sent to and received from MessageChannel instances, those channels provide an opportunity for intercepting the send and receive operations.
The ChannelInterceptor strategy interface, shown in the following listing, provides methods for each of those operations:
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
After implementing the interface, registering the interceptor with a channel is just a matter of making the following call:
channel.addInterceptor(someChannelInterceptor);
The methods that return a Message instance can be used for transforming the Message or can return 'null' to prevent further processing (of course, any of the methods can throw a RuntimeException).
Also, the preReceive method can return false to prevent the receive operation from proceeding.
Keep in mind that receive() calls are only relevant for PollableChannels.
In fact, the SubscribableChannel interface does not even define a receive() method.
The reason for this is that when a Message is sent to a SubscribableChannel, it is sent directly to zero or more subscribers, depending on the type of channel (for example,
a PublishSubscribeChannel sends to all of its subscribers).
Therefore, the preReceive(…), postReceive(…), and afterReceiveCompletion(…) interceptor methods are invoked only when the interceptor is applied to a PollableChannel.
|
Spring Integration also provides an implementation of the Wire Tap pattern.
It is a simple interceptor that sends the Message to another channel without otherwise altering the existing flow.
It can be very useful for debugging and monitoring.
An example is shown in Wire Tap.
Because it is rarely necessary to implement all of the interceptor methods, the interface provides no-op methods (methods returning void method have no code, the Message-returning methods return the Message as-is, and the boolean method returns true).
The order of invocation for the interceptor methods depends on the type of channel.
As described earlier, the queue-based channels are the only ones where the receive method is intercepted in the first place.
Additionally, the relationship between send and receive interception depends on the timing of the separate sender and receiver threads.
For example, if a receiver is already blocked while waiting for a message, the order could be as follows: preSend, preReceive, postReceive, postSend.
However, if a receiver polls after the sender has placed a message on the channel and has already returned, the order would be as follows: preSend, postSend (some-time-elapses), preReceive, postReceive.
The time that elapses in such a case depends on a number of factors and is therefore generally unpredictable (in fact, the receive may never happen).
The type of queue also plays a role (for example, rendezvous versus priority).
In short, you cannot rely on the order beyond the fact that preSend precedes postSend and preReceive precedes postReceive.
|
Starting with Spring Framework 4.1 and Spring Integration 4.1, the ChannelInterceptor provides new methods: afterSendCompletion() and afterReceiveCompletion().
They are invoked after send()' and 'receive() calls, regardless of any exception that is raised, which allow for resource cleanup.
Note that the channel invokes these methods on the ChannelInterceptor list in the reverse order of the initial preSend() and preReceive() calls.
Starting with version 5.1, global channel interceptors now apply to dynamically registered channels - such as through beans that are initialized by using beanFactory.initializeBean() or IntegrationFlowContext when using the Java DSL.
Previously, interceptors were not applied when beans were created after the application context was refreshed.
Also, starting with version 5.1, ChannelInterceptor.postReceive() is no longer called when no message is received; it is no longer necessary to check for a null Message<?>.
Previously, the method was called.
If you have an interceptor that relies on the previous behavior, implement afterReceiveCompleted() instead, since that method is invoked, regardless of whether a message is received or not.
Starting with version 5.2, the ChannelInterceptorAware is deprecated in favor of InterceptableChannel from the Spring Messaging module, which it extends now for backward compatibility.
|
6.1.4. MessagingTemplate
When the endpoints and their various configuration options are introduced, Spring Integration provides a foundation for messaging components that enables non-invasive invocation of your application code from the messaging system.
However, it is sometimes necessary to invoke the messaging system from your application code.
For convenience when implementing such use cases, Spring Integration provides a MessagingTemplate that supports a variety of operations across the message channels, including request and reply scenarios.
For example, it is possible to send a request and wait for a reply, as follows:
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
In the preceding example, a temporary anonymous channel would be created internally by the template. The 'sendTimeout' and 'receiveTimeout' properties may also be set on the template, and other exchange types are also supported. The following listing shows the signatures for such methods:
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
A less invasive approach that lets you invoke simple interfaces with payload or header values instead of Message instances is described in Enter the GatewayProxyFactoryBean.
|
6.1.5. Configuring Message Channels
To create a message channel instance, you can use the <channel/> element for xml or DirectChannel instance for Java configuration, as follows:
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
When you use the <channel/> element without any sub-elements, it creates a DirectChannel instance (a SubscribableChannel).
To create a publish-subscribe channel, use the <publish-subscribe-channel/> element (the PublishSubscribeChannel in Java), as follows:
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
You can alternatively provide a variety of <queue/> sub-elements to create any of the pollable channel types (as described in Message Channel Implementations).
The following sections shows examples of each channel type.
DirectChannel Configuration
As mentioned earlier, DirectChannel is the default type.
The following listing shows who to define one:
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
A default channel has a round-robin load-balancer and also has failover enabled (see DirectChannel for more detail).
To disable one or both of these, add a <dispatcher/> sub-element (a LoadBalancingStrategy constructor of the DirectChannel) and configure the attributes as follows:
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
Datatype Channel Configuration
Sometimes, a consumer can process only a particular type of payload, forcing you to ensure the payload type of the input messages. The first thing that comes to mind may be to use a message filter. However, all that message filter can do is filter out messages that are not compliant with the requirements of the consumer. Another way would be to use a content-based router and route messages with non-compliant data-types to specific transformers to enforce transformation and conversion to the required data type. This would work, but a simpler way to accomplish the same thing is to apply the Datatype Channel pattern. You can use separate datatype channels for each specific payload data type.
To create a datatype channel that accepts only messages that contain a certain payload type, provide the data type’s fully-qualified class name in the channel element’s datatype attribute, as the following example shows:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
Note that the type check passes for any type that is assignable to the channel’s datatype.
In other words, the numberChannel in the preceding example would accept messages whose payload is java.lang.Integer or java.lang.Double.
Multiple types can be provided as a comma-delimited list, as the following example shows:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
So the 'numberChannel' in the preceding example accepts only messages with a data type of java.lang.Number.
But what happens if the payload of the message is not of the required type?
It depends on whether you have defined a bean named integrationConversionService that is an instance of Spring’s Conversion Service.
If not, then an Exception would be thrown immediately.
However, if you have defined an integrationConversionService bean, it is used in an attempt to convert the message’s payload to the acceptable type.
You can even register custom converters.
For example, suppose you send a message with a String payload to the 'numberChannel' we configured above.
You might handle the message as follows:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
Typically this would be a perfectly legal operation. However, since we use Datatype Channel, the result of such operation would generate an exception similar to the following:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
The exception happens because we require the payload type to be a Number, but we sent a String.
So we need something to convert a String to a Number.
For that, we can implement a converter similar to the following example:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
Then we can register it as a converter with the Integration Conversion Service, as the following example shows:
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
Or on the StringToIntegerConverter class when it is marked with the @Component annotation for auto-scanning.
When the 'converter' element is parsed, it creates the integrationConversionService bean if one is not already defined.
With that converter in place, the send operation would now be successful, because the datatype channel uses that converter to convert the String payload to an Integer.
For more information regarding payload type conversion, see Payload Type Conversion.
Beginning with version 4.0, the integrationConversionService is invoked by the DefaultDatatypeChannelMessageConverter, which looks up the conversion service in the application context.
To use a different conversion technique, you can specify the message-converter attribute on the channel.
This must be a reference to a MessageConverter implementation.
Only the fromMessage method is used.
It provides the converter with access to the message headers (in case the conversion might need information from the headers, such as content-type).
The method can return only the converted payload or a full Message object.
If the latter, the converter must be careful to copy all the headers from the inbound message.
Alternatively, you can declare a <bean/> of type MessageConverter with an ID of datatypeChannelMessageConverter, and that converter is used by all channels with a datatype.
QueueChannel Configuration
To create a QueueChannel, use the <queue/> sub-element.
You may specify the channel’s capacity as follows:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
If you do not provide a value for the 'capacity' attribute on this <queue/> sub-element, the resulting queue is unbounded.
To avoid issues such as running out of memory, we highly recommend that you set an explicit value for a bounded queue.
|
Persistent QueueChannel Configuration
Since a QueueChannel provides the capability to buffer messages but does so in-memory only by default, it also introduces a possibility that messages could be lost in the event of a system failure.
To mitigate this risk, a QueueChannel may be backed by a persistent implementation of the MessageGroupStore strategy interface.
For more details on MessageGroupStore and MessageStore, see Message Store.
The capacity attribute is not allowed when the message-store attribute is used.
|
When a QueueChannel receives a Message, it adds the message to the message store.
When a Message is polled from a QueueChannel, it is removed from the message store.
By default, a QueueChannel stores its messages in an in-memory queue, which can lead to the lost message scenario mentioned earlier.
However, Spring Integration provides persistent stores, such as the JdbcChannelMessageStore.
You can configure a message store for any QueueChannel by adding the message-store attribute, as the following example shows:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(See samples below for Java/Kotlin Configuration options.)
The Spring Integration JDBC module also provides a schema Data Definition Language (DDL) for a number of popular databases.
These schemas are located in the org.springframework.integration.jdbc.store.channel package of that module (spring-integration-jdbc).
One important feature is that, with any transactional persistent store (such as JdbcChannelMessageStore), as long as the poller has a transaction configured, a message removed from the store can be permanently removed only if the transaction completes successfully.
Otherwise the transaction rolls back, and the Message is not lost.
|
Many other implementations of the message store are available as the growing number of Spring projects related to “NoSQL” data stores come to provide underlying support for these stores.
You can also provide your own implementation of the MessageGroupStore interface if you cannot find one that meets your particular needs.
Since version 4.0, we recommend that QueueChannel instances be configured to use a ChannelMessageStore, if possible.
These are generally optimized for this use, as compared to a general message store.
If the ChannelMessageStore is a ChannelPriorityMessageStore, the messages are received in FIFO within priority order.
The notion of priority is determined by the message store implementation.
For example, the following example shows the Java configuration for the MongoDB Channel Message Store:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlows.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
Pay attention to the MessageGroupQueue class.
That is a BlockingQueue implementation to use the MessageGroupStore operations.
|
Another option to customize the QueueChannel environment is provided by the ref attribute of the <int:queue> sub-element or its particular constructor.
This attribute supplies the reference to any java.util.Queue implementation.
For example, a Hazelcast distributed IQueue can be configured as follows:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel Configuration
To create a PublishSubscribeChannel, use the <publish-subscribe-channel/> element.
When using this element, you can also specify the task-executor used for publishing messages (if none is specified, it publishes in the sender’s thread), as follows:
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
If you provide a resequencer or aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true.
Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages.
For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.
Along with the Executor, you can also configure an ErrorHandler.
By default, the PublishSubscribeChannel uses a MessagePublishingErrorHandler implementation to send an error to the MessageChannel from the errorChannel header or into the global errorChannel instance.
If an Executor is not configured, the ErrorHandler is ignored and exceptions are thrown directly to the caller’s thread.
If you provide a Resequencer or Aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true.
Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages.
For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.
The following example shows how to set the apply-sequence header to true:
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(false);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
The apply-sequence value is false by default so that a publish-subscribe channel can send the exact same message instances to multiple outbound channels.
Since Spring Integration enforces immutability of the payload and header references, when the flag is set to true, the channel creates new Message instances with the same payload reference but different header values.
|
Starting with version 5.4.3, the PublishSubscribeChannel can also be configured with the requireSubscribers option of its BroadcastingDispatcher to indicate that this channel will not ignore a message silently when it has no subscribers.
A MessageDispatchingException with a Dispatcher has no subscribers message is thrown when there are no subscribers and this option is set to true.
ExecutorChannel
To create an ExecutorChannel, add the <dispatcher> sub-element with a task-executor attribute.
The attribute’s value can reference any TaskExecutor within the context.
For example, doing so enables configuration of a thread pool for dispatching messages to subscribed handlers.
As mentioned earlier, doing so breaks the single-threaded execution context between sender and receiver so that any active transaction context is not shared by the invocation of the handler (that is, the handler may throw an Exception, but the send invocation has already returned successfully).
The following example shows how to use the dispatcher element and specify an executor in the task-executor attribute:
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
The
|
PriorityChannel Configuration
To create a PriorityChannel, use the <priority-queue/> sub-element, as the following example shows:
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
By default, the channel consults the priority header of the message.
However, you can instead provide a custom Comparator reference.
Also, note that the PriorityChannel (like the other types) does support the datatype attribute.
As with the QueueChannel, it also supports a capacity attribute.
The following example demonstrates all of these:
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
Since version 4.0, the priority-channel child element supports the message-store option (comparator and capacity are not allowed in that case).
The message store must be a PriorityCapableChannelMessageStore.
Implementations of the PriorityCapableChannelMessageStore are currently provided for Redis, JDBC, and MongoDB.
See QueueChannel Configuration and Message Store for more information.
You can find sample configuration in Backing Message Channels.
RendezvousChannel Configuration
A RendezvousChannel is created when the queue sub-element is a <rendezvous-queue>.
It does not provide any additional configuration options to those described earlier, and its queue does not accept any capacity value, since it is a zero-capacity direct handoff queue.
The following example shows how to declare a RendezvousChannel:
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
Scoped Channel Configuration
Any channel can be configured with a scope attribute, as the following example shows:
<int:channel id="threadLocalChannel" scope="thread"/>
Channel Interceptor Configuration
Message channels may also have interceptors, as described in Channel Interceptors.
The <interceptors/> sub-element can be added to a <channel/> (or the more specific element types).
You can provide the ref attribute to reference any Spring-managed object that implements the ChannelInterceptor interface, as the following example shows:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
In general, we recommend defining the interceptor implementations in a separate location, since they usually provide common behavior that can be reused across multiple channels.
Global Channel Interceptor Configuration
Channel interceptors provide a clean and concise way of applying cross-cutting behavior per individual channel. If the same behavior should be applied on multiple channels, configuring the same set of interceptors for each channel would not be the most efficient way. To avoid repeated configuration while also enabling interceptors to apply to multiple channels, Spring Integration provides global interceptors. Consider the following pair of examples:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
Each <channel-interceptor/> element lets you define a global interceptor, which is applied on all channels that match any patterns defined by the pattern attribute.
In the preceding case, the global interceptor is applied on the 'thing1' channel and all other channels that begin with 'thing2' or 'input' but not to channels starting with 'thing3' (since version 5.0).
The addition of this syntax to the pattern causes one possible (though perhaps unlikely) problem.
If you have a bean named !thing1 and you included a pattern of !thing1 in your channel interceptor’s pattern patterns, it no longer matches.
The pattern now matches all beans not named thing1.
In this case, you can escape the ! in the pattern with \.
The pattern \!thing1 matches a bean named !thing1.
|
The order attribute lets you manage where this interceptor is injected when there are multiple interceptors on a given channel. For example, channel 'inputChannel' could have individual interceptors configured locally (see below), as the following example shows:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
A reasonable question is “how is a global interceptor injected in relation to other interceptors configured locally or through other global interceptor definitions?”
The current implementation provides a simple mechanism for defining the order of interceptor execution.
A positive number in the order attribute ensures interceptor injection after any existing interceptors, while a negative number ensures that the interceptor is injected before existing interceptors.
This means that, in the preceding example, the global interceptor is injected after (since its order is greater than 0) the 'wire-tap' interceptor configured locally.
If there were another global interceptor with a matching pattern, its order would be determined by comparing the values of both interceptors' order attributes.
To inject a global interceptor before the existing interceptors, use a negative value for the order attribute.
Note that both the order and pattern attributes are optional.
The default value for order will be 0 and for pattern, the default is '*' (to match all channels).
|
Wire Tap
As mentioned earlier, Spring Integration provides a simple wire tap interceptor.
You can configure a wire tap on any channel within an <interceptors/> element.
Doing so is especially useful for debugging and can be used in conjunction with Spring Integration’s logging channel adapter as follows:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
The 'logging-channel-adapter' also accepts an 'expression' attribute so that you can evaluate a SpEL expression against the 'payload' and 'headers' variables.
Alternatively, to log the full message toString() result, provide a value of true for the 'log-full-message' attribute.
By default, it is false so that only the payload is logged.
Setting it to true enables logging of all headers in addition to the payload.
The 'expression' option provides the most flexibility (for example, expression="payload.user.name").
|
One of the common misconceptions about the wire tap and other similar components (Message Publishing Configuration) is that they are automatically asynchronous in nature. By default, wire tap as a component is not invoked asynchronously. Instead, Spring Integration focuses on a single unified approach to configuring asynchronous behavior: the message channel. What makes certain parts of the message flow synchronous or asynchronous is the type of Message Channel that has been configured within that flow. That is one of the primary benefits of the message channel abstraction. From the inception of the framework, we have always emphasized the need and the value of the message channel as a first-class citizen of the framework. It is not just an internal, implicit realization of the EIP pattern. It is fully exposed as a configurable component to the end user. So, the wire tap component is only responsible for performing the following tasks:
-
Intercept a message flow by tapping into a channel (for example,
channelA) -
Grab each message
-
Send the message to another channel (for example,
channelB)
It is essentially a variation of the bridge pattern, but it is encapsulated within a channel definition (and hence easier to enable and disable without disrupting a flow). Also, unlike the bridge, it basically forks another message flow. Is that flow synchronous or asynchronous? The answer depends on the type of message channel that 'channelB' is. We have the following options: direct channel, pollable channel, and executor channel. The last two break the thread boundary, making communication over such channels asynchronous, because the dispatching of the message from that channel to its subscribed handlers happens on a different thread than the one used to send the message to that channel. That is what is going to make your wire-tap flow synchronous or asynchronous. It is consistent with other components within the framework (such as message publisher) and adds a level of consistency and simplicity by sparing you from worrying in advance (other than writing thread-safe code) about whether a particular piece of code should be implemented as synchronous or asynchronous. The actual wiring of two pieces of code (say, component A and component B) over a message channel is what makes their collaboration synchronous or asynchronous. You may even want to change from synchronous to asynchronous in the future, and message channel lets you to do it swiftly without ever touching the code.
One final point regarding the wire tap is that, despite the rationale provided above for not being asynchronous by default, you should keep in mind that it is usually desirable to hand off the message as soon as possible. Therefore, it would be quite common to use an asynchronous channel option as the wire tap’s outbound channel. However we doe not enforce asynchronous behavior by default. There are a number of use cases that would break if we did, including that you might not want to break a transactional boundary. Perhaps you use the wire tap pattern for auditing purposes, and you do want the audit messages to be sent within the original transaction. As an example, you might connect the wire tap to a JMS outbound channel adapter. That way, you get the best of both worlds: 1) the sending of a JMS Message can occur within the transaction while 2) it is still a “fire-and-forget” action, thereby preventing any noticeable delay in the main message flow.
Starting with version 4.0, it is important to avoid circular references when an interceptor (such as the WireTap class) references a channel.
You need to exclude such channels from those being intercepted by the current interceptor.
This can be done with appropriate patterns or programmatically.
If you have a custom ChannelInterceptor that references a channel, consider implementing VetoCapableInterceptor.
That way, the framework asks the interceptor if it is OK to intercept each channel that is a candidate, based on the supplied pattern.
You can also add runtime protection in the interceptor methods to ensure that the channel is not one that is referenced by the interceptor.
The WireTap uses both of these techniques.
|
Starting with version 4.3, the WireTap has additional constructors that take a channelName instead of a
MessageChannel instance.
This can be convenient for Java configuration and when channel auto-creation logic is being used.
The target MessageChannel bean is resolved from the provided channelName later, on the first interaction with the
interceptor.
Channel resolution requires a BeanFactory, so the wire tap instance must be a Spring-managed bean.
|
This late-binding approach also allows simplification of typical wire-tapping patterns with Java DSL configuration, as the following example shows:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
Conditional Wire Taps
Wire taps can be made conditional by using the selector or selector-expression attributes.
The selector references a MessageSelector bean, which can determine at runtime whether the message should go to the tap channel.
Similarly, the selector-expression is a boolean SpEL expression that performs the same purpose: If the expression evaluates to true, the message is sent to the tap channel.
Global Wire Tap Configuration
It is possible to configure a global wire tap as a special case of the Global Channel Interceptor Configuration.
To do so, configure a top level wire-tap element.
Now, in addition to the normal wire-tap namespace support, the pattern and order attributes are supported and work in exactly the same way as they do for the channel-interceptor.
The following example shows how to configure a global wire tap:
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
A global wire tap provides a convenient way to configure a single-channel wire tap externally without modifying the existing channel configuration.
To do so, set the pattern attribute to the target channel name.
For example, you can use this technique to configure a test case to verify messages on a channel.
|
6.1.6. Special Channels
Two special channels are defined within the application context by default: errorChannel and nullChannel.
The 'nullChannel' (an instance of NullChannel) acts like /dev/null, logging any message sent to it at the DEBUG level and returning immediately.
The special treatment is applied for an org.reactivestreams.Publisher payload of a sent message: it is subscribed to in this channel immediately, to initiate reactive stream processing, although the data is discarded.
An error thrown from a reactive stream processing (see Subscriber.onError(Throwable)) is logged under the warn level for possible investigation.
If there is need to do anything with such an error, the ReactiveRequestHandlerAdvice with a Mono.doOnError() customization can be applied to the message handler producing Mono reply into this nullChannel.
Any time you face channel resolution errors for a reply that you do not care about, you can set the affected component’s output-channel attribute to 'nullChannel' (the name, 'nullChannel', is reserved within the application context).
The 'errorChannel' is used internally for sending error messages and may be overridden with a custom configuration. This is discussed in greater detail in Error Handling.
See also Message Channels in the Java DSL chapter for more information about message channel and interceptors.
6.2. Poller
This section describes how polling works in Spring Integration.
6.2.1. Polling Consumer
When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:
The actual implementation depends on the type of channel to which these endpoints connect.
A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel interface produces an instance of EventDrivenConsumer.
On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel interface (such as a QueueChannel) produces an instance of PollingConsumer.
Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.
They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.
6.2.2. Pollable Message Source
Spring Integration offers a second variation of the polling consumer pattern.
When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter.
For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages.
So, when components are configured with pollers, the resulting instances are of one of the following types:
This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:
-
Polling certain external systems, such as FTP Servers, Databases, and Web Services
-
Polling internal (pollable) message channels
-
Polling internal services (such as repeatedly executing methods on a Java class)
AOP advice classes can be applied to pollers, in an advice-chain, such as a transaction advice to start a transaction.
Starting with version 4.1, a PollSkipAdvice is provided.
Pollers use triggers to determine the time of the next poll.
The PollSkipAdvice can be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed.
To use this advice, you have to provide it with an implementation of a PollSkipStrategy.
Starting with version 4.2.5, a SimplePollSkipStrategy is provided.
To use it, you can add an instance as a bean to the application context, inject it into a PollSkipAdvice, and add that to the poller’s advice chain.
To skip polling, call skipPolls().
To resume polling, call reset().
Version 4.2 added more flexibility in this area.
See Conditional Pollers for Message Sources.
|
This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.
6.2.3. Deferred Acknowledgment Pollable Message Source
Starting with version 5.0.1, certain modules provide MessageSource implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread).
This is currently limited to the AmqpMessageSource and the KafkaMessageSource.
With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header (see MessageHeaderAccessor API) is added to the message.
When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback, as the following example shows:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
Not all message sources (for example, a KafkaMessageSource) support the REJECT status.
It is treated the same as ACCEPT.
Applications can acknowledge a message at any time, as the following example shows:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
If the MessageSource is wired into a SourcePollingChannelAdapter, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT it (or REJECT if the flow throws an exception).
The status values are defined in the AcknowledgmentCallback.Status enumeration.
Spring Integration provides MessageSourcePollingTemplate to perform ad-hoc polling of a MessageSource.
This, too, takes care of setting ACCEPT or REJECT on the AcknowledgmentCallback when the MessageHandler callback returns (or throws an exception).
The following example shows how to poll with the MessageSourcePollingTemplate:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
In both cases (SourcePollingChannelAdapter and MessageSourcePollingTemplate), you can disable auto ack/nack by calling noAutoAck() on the callback.
You might do this if you hand off the message to another thread and wish to acknowledge later.
Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).
6.2.4. Conditional Pollers for Message Sources
This section covers how to use conditional pollers.
Background
Advice objects, in an advice-chain on a poller, advise the whole polling task (both message retrieval and processing).
These “around advice” methods do not have access to any context for the poll — only the poll itself.
This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier.
What if we wish to take some action depending on the result of the receive part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.
“Smart” Polling
Version 5.3 introduced the ReceiveMessageAdvice interface.
(The AbstractMessageSourceAdvice has been deprecated in favor of default methods in the MessageSourceMutator.)
Any Advice objects in the advice-chain that implement this interface are applied only to the receive operation - MessageSource.receive() and PollableChannel.receive(timeout).
Therefore they can be applied only for the SourcePollingChannelAdapter or PollingConsumer.
Such classes implement the following methods:
-
beforeReceive(Object source)This method is called before theObject.receive()method. It lets you examine and reconfigure the source. Returningfalsecancels this poll (similar to thePollSkipAdvicementioned earlier). -
Message<?> afterReceive(Message<?> result, Object source)This method is called after thereceive()method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can benullif there was no message created by the source). You can even return a different message
|
Thread safety
If an advice mutates the, you should not configure the poller with a |
|
Advice Chain Ordering
You should understand how the advice chain is processed during initialization.
|
SimpleActiveIdleReceiveMessageAdvice
(The previous SimpleActiveIdleMessageSourceAdvice for only MessageSource is deprecated.)
This advice is a simple implementation of ReceiveMessageAdvice.
When used in conjunction with a DynamicPeriodicTrigger, it adjusts the polling frequency, depending on whether or not the previous poll resulted in a message or not.
The poller must also have a reference to the same DynamicPeriodicTrigger.
|
Important: Async Handoff
SimpleActiveIdleReceiveMessageAdvice modifies the trigger based on the receive() result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.
|
CompoundTriggerAdvice
This advice allows the selection of one of two triggers based on whether a poll returns a message or not.
Consider a poller that uses a CronTrigger.
CronTrigger instances are immutable, so they cannot be altered once constructed.
Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.
The advice (and poller) use a CompoundTrigger for this purpose.
The trigger’s primary trigger can be a CronTrigger.
When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger.
When the CompoundTrigger instance’s nextExecutionTime method is invoked, it delegates to the secondary trigger, if present.
Otherwise, it delegates to the primary trigger.
The poller must also have a reference to the same CompoundTrigger.
The following example shows the configuration for the hourly cron expression with a fallback to every minute:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
|
Important: Async Handoff
CompoundTriggerAdvice modifies the trigger based on the receive() result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.
|
MessageSource-only Advices
Some advices might be applied only for the MessageSource.receive() and they don’t make sense for PollableChannel.
For this purpose a MessageSourceMutator interface (an extension of the ReceiveMessageAdvice) is still present.
With default methods it fully replaces already deprecated AbstractMessageSourceAdvice and should be used in those implementations where only MessageSource proxying is expected.
See Inbound Channel Adapters: Polling Multiple Servers and Directories for more information.
6.3. Channel Adapter
A channel adapter is a message endpoint that enables connecting a single sender or receiver to a message channel. Spring Integration provides a number of adapters to support various transports, such as JMS, file, HTTP, web services, mail, and more. Upcoming chapters of this reference guide discuss each adapter. However, this chapter focuses on the simple but flexible method-invoking channel adapter support. There are both inbound and outbound adapters, and each may be configured with XML elements provided in the core namespace. These provide an easy way to extend Spring Integration, as long as you have a method that can be invoked as either a source or a destination.
6.3.1. Configuring An Inbound Channel Adapter
An inbound-channel-adapter element (a SourcePollingChannelAdapter in Java configuration) can invoke any method on a Spring-managed object and send a non-null return value to a MessageChannel after converting the method’s output to a Message.
When the adapter’s subscription is activated, a poller tries to receive messages from the source.
The poller is scheduled with the TaskScheduler according to the provided configuration.
To configure the polling interval or cron expression for an individual channel adapter, you can provide a 'poller' element with one of the scheduling attributes, such as 'fixed-rate' or 'cron'.
The following example defines two inbound-channel-adapter instances:
@Bean
public IntegrationFlow source1() {
return IntegrationFlows.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlows.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
See also Channel Adapter Expressions and Scripts.
| If no poller is provided, then a single default poller must be registered within the context. See Endpoint Namespace Support for more detail. |
|
Important: Poller Configuration
All the
In the first configuration, the polling task is invoked once per poll, and, during each task (poll), the method (which results in the production of the message) is invoked once, based on the
Note that there is no However, in the However, if you are sure that your method can return null and you need to poll for as many sources as available per each poll, you should explicitly set
Starting with version 5.5, a Also see Global Default Poller for more information. |
6.3.2. Configuring An Outbound Channel Adapter
An outbound-channel-adapter element (a @ServiceActivator for Java configuration) can also connect a MessageChannel to any POJO consumer method that should be invoked with the payload of messages sent to that channel.
The following example shows how to define an outbound channel adapter:
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
If the channel being adapted is a PollableChannel, you must provide a poller sub-element (the @Poller sub-annotation on the @ServiceActivator), as the following example shows:
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
You should use a ref attribute if the POJO consumer implementation can be reused in other <outbound-channel-adapter> definitions.
However, if the consumer implementation is referenced by only a single definition of the <outbound-channel-adapter>, you can define it as an inner bean, as the following example shows:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
Using both the ref attribute and an inner handler definition in the same <outbound-channel-adapter> configuration is not allowed, as it creates an ambiguous condition.
Such a configuration results in an exception being thrown.
|
Any channel adapter can be created without a channel reference, in which case it implicitly creates an instance of DirectChannel.
The created channel’s name matches the id attribute of the <inbound-channel-adapter> or <outbound-channel-adapter> element.
Therefore, if channel is not provided, id is required.
6.3.3. Channel Adapter Expressions and Scripts
Like many other Spring Integration components, the <inbound-channel-adapter> and <outbound-channel-adapter> also provide support for SpEL expression evaluation.
To use SpEL, provide the expression string in the 'expression' attribute instead of providing the 'ref' and 'method' attributes that are used for method-invocation on a bean.
When an expression is evaluated, it follows the same contract as method-invocation where: the expression for an <inbound-channel-adapter> generates a message any time the evaluation result is a non-null value, while the expression for an <outbound-channel-adapter> must be the equivalent of a void-returning method invocation.
Starting with Spring Integration 3.0, an <int:inbound-channel-adapter/> can also be configured with a SpEL <expression/> (or even with a <script/>) sub-element, for when more sophistication is required than can be achieved with the simple 'expression' attribute.
If you provide a script as a Resource by using the location attribute, you can also set refresh-check-delay, which allows the resource to be periodically refreshed.
If you want the script to be checked on each poll, you would need to coordinate this setting with the poller’s trigger, as the following example shows:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
See also the cacheSeconds property on the ReloadableResourceBundleExpressionSource when using the <expression/> sub-element.
For more information regarding expressions, see Spring Expression Language (SpEL).
For scripts, see Groovy support and Scripting Support.
The <int:inbound-channel-adapter/> (SourcePollingChannelAdapter) is an endpoint which starts a message flow by periodically triggering to poll some underlying MessageSource.
Since, at the time of polling, there is no message object, expressions and scripts do not have access to a root Message, so there are no payload or headers properties that are available in most other messaging SpEL expressions.
The script can generate and return a complete Message object with headers and payload or only a payload, which is added to a message with basic headers by the framework.
|
6.4. Messaging Bridge
A messaging bridge is a relatively trivial endpoint that connects two message channels or channel adapters.
For example, you may want to connect a PollableChannel to a SubscribableChannel so that the subscribing endpoints do not have to worry about any polling configuration.
Instead, the messaging bridge provides the polling configuration.
By providing an intermediary poller between two channels, you can use a messaging bridge to throttle inbound messages.
The poller’s trigger determines the rate at which messages arrive on the second channel, and the poller’s maxMessagesPerPoll property enforces a limit on the throughput.
Another valid use for a messaging bridge is to connect two different systems. In such a scenario, Spring Integration’s role is limited to making the connection between these systems and managing a poller, if necessary. It is probably more common to have at least a transformer between the two systems, to translate between their formats. In that case, the channels can be provided as the 'input-channel' and 'output-channel' of a transformer endpoint. If data format translation is not required, the messaging bridge may indeed be sufficient.
6.4.1. Configuring a Bridge with XML
You can use the <bridge> element is used to create a messaging bridge between two message channels or channel adapters.
To do so, provide the input-channel and output-channel attributes, as the following example shows:
<int:bridge input-channel="input" output-channel="output"/>
As mentioned above, a common use case for the messaging bridge is to connect a PollableChannel to a SubscribableChannel.
When performing this role, the messaging bridge may also serve as a throttler:
<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>
You can use a similar mechanism to connecting channel adapters.
The following example shows a simple “echo” between the stdin and stdout adapters from Spring Integration’s stream namespace:
<int-stream:stdin-channel-adapter id="stdin"/>
<int-stream:stdout-channel-adapter id="stdout"/>
<int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>
Similar configurations work for other (potentially more useful) Channel Adapter bridges, such as file-to-JMS or mail-to-file. Upcoming chapters cover the various channel adapters.
| If no 'output-channel' is defined on a bridge, the reply channel provided by the inbound message is used, if available. If neither an output nor a reply channel is available, an exception is thrown. |
6.4.2. Configuring a Bridge with Java Configuration
The following example shows how to configure a bridge in Java by using the @BridgeFrom annotation:
@Bean
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
return new DirectChannel();
}
The following example shows how to configure a bridge in Java by using the @BridgeTo annotation:
@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
public SubscribableChannel direct() {
return new DirectChannel();
}
Alternately, you can use a BridgeHandler, as the following example shows:
@Bean
@ServiceActivator(inputChannel = "polled",
poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("direct");
return bridge;
}
6.4.3. Configuring a Bridge with the Java DSL
You can use the Java Domain Specific Language (DSL) to configure a bridge, as the following example shows:
@Bean
public IntegrationFlow bridgeFlow() {
return IntegrationFlows.from("polled")
.bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
.channel("direct")
.get();
}
7. Message
The Spring Integration Message is a generic container for data.
Any object can be provided as the payload, and each Message instance includes headers containing user-extensible properties as key-value pairs.
7.1. The Message Interface
The following listing shows the definition of the Message interface:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
The Message interface is a core part of the API.
By encapsulating the data in a generic wrapper, the messaging system can pass it around without any knowledge of the data’s type.
As an application evolves to support new types or when the types themselves are modified or extended, the messaging system is not affected.
On the other hand, when some component in the messaging system does require access to information about the Message, such metadata can typically be stored to and retrieved from the metadata in the message headers.
7.2. Message Headers
Just as Spring Integration lets any Object be used as the payload of a Message, it also supports any Object types as header values.
In fact, the MessageHeaders class implements the java.util.Map_ interface, as the following class definition shows:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
Even though the MessageHeaders class implements Map, it is effectively a read-only implementation.
Any attempt to put a value in the Map results in an UnsupportedOperationException.
The same applies for remove and clear.
Since messages may be passed to multiple consumers, the structure of the Map cannot be modified.
Likewise, the message’s payload Object can not be set after the initial creation.
However, the mutability of the header values themselves (or the payload Object) is intentionally left as a decision for the framework user.
|
As an implementation of Map, the headers can be retrieved by calling get(..) with the name of the header.
Alternatively, you can provide the expected Class as an additional parameter.
Even better, when retrieving one of the pre-defined values, convenient getters are available.
The following example shows each of these three options:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
The following table describes the pre-defined message headers:
| Header Name | Header Type | Usage |
|---|---|---|
MessageHeaders.ID |
java.util.UUID |
An identifier for this message instance. Changes each time a message is mutated. |
MessageHeaders. TIMESTAMP |
java.lang.Long |
The time the message was created. Changes each time a message is mutated. |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
A channel to which a reply (if any) is sent when no explicit output channel is configured and there is no |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
A channel to which errors are sent.
If the value is a |
Many inbound and outbound adapter implementations also provide or expect certain headers, and you can configure additional user-defined headers.
Constants for these headers can be found in those modules where such headers exist — for example.
AmqpHeaders, JmsHeaders, and so on.
7.2.1. MessageHeaderAccessor API
Starting with Spring Framework 4.0 and Spring Integration 4.0, the core messaging abstraction has been moved to the spring-messaging module, and the MessageHeaderAccessor API has been introduced to provide additional abstraction over messaging implementations.
All (core) Spring Integration-specific message headers constants are now declared in the IntegrationMessageHeaderAccessor class.
The following table describes the pre-defined message headers:
| Header Name | Header Type | Usage |
|---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
Used to correlate two or more messages. |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
Usually a sequence number with a group of messages with a |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
The number of messages within a group of correlated messages. |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
Indicates when a message is expired.
Not used by the framework directly but can be set with a header enricher and used in a |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
Message priority — for example, within a |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
True if a message was detected as a duplicate by an idempotent receiver interceptor. See Idempotent Receiver Enterprise Integration Pattern. |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
This header is present if the message is associated with a |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
If a message-driven channel adapter supports the configuration of a |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
If an inbound endpoint supports it, a call back to accept, reject, or requeue a message. See Deferred Acknowledgment Pollable Message Source and MQTT Manual Acks. |
Convenient typed getters for some of these headers are provided on the IntegrationMessageHeaderAccessor class, as the following example shows:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
The following table describes headers that also appear in the IntegrationMessageHeaderAccessor but are generally not used by user code (that is, they are generally used by internal parts of Spring Integration — their inclusion here is for completeness):
| Header Name | Header Type | Usage |
|---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
A stack of correlation data used when nested correlation is needed (for example,
|
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
See Routing Slip. |
7.2.2. Message ID Generation
When a message transitions through an application, each time it is mutated (for example,
by a transformer) a new message ID is assigned.
The message ID is a UUID.
Beginning with Spring Integration 3.0, the default strategy used for IS generation is more efficient than the previous java.util.UUID.randomUUID() implementation.
It uses simple random numbers based on a secure random seed instead of creating a secure random number each time.
A different UUID generation strategy can be selected by declaring a bean that implements org.springframework.util.IdGenerator in the application context.
Only one UUID generation strategy can be used in a classloader.
This means that, if two or more application contexts run in the same classloader, they share the same strategy.
If one of the contexts changes the strategy, it is used by all contexts.
If two or more contexts in the same classloader declare a bean of type org.springframework.util.IdGenerator, they must all be an instance of the same class.
Otherwise, the context attempting to replace a custom strategy fails to initialize.
If the strategy is the same, but parameterized, the strategy in the first context to be initialized is used.
|
In addition to the default strategy, two additional IdGenerators are provided.
org.springframework.util.JdkIdGenerator uses the previous UUID.randomUUID() mechanism.
You can use o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator when a UUID is not really needed and a simple incrementing value is sufficient.
7.2.3. Read-only Headers
The MessageHeaders.ID and MessageHeaders.TIMESTAMP are read-only headers and cannot be overridden.
Since version 4.3.2, the MessageBuilder provides the readOnlyHeaders(String… readOnlyHeaders) API to customize a list of headers that should not be copied from an upstream Message.
Only the MessageHeaders.ID and MessageHeaders.TIMESTAMP are read only by default.
The global spring.integration.readOnly.headers property (see Global Properties) is provided to customize DefaultMessageBuilderFactory for framework components.
This can be useful when you would like do not populate some out-of-the-box headers, such as contentType by the ObjectToJsonTransformer (see JSON Transformers).
When you try to build a new message using MessageBuilder, this kind of header is ignored and a particular INFO message is emitted to logs.
Starting with version 5.0, Messaging Gateway, Header Enricher, Content Enricher and Header Filter do not let you configure the MessageHeaders.ID and MessageHeaders.TIMESTAMP header names when DefaultMessageBuilderFactory is used, and they throw BeanInitializationException.
7.2.4. Header Propagation
When messages are processed (and modified) by message-producing endpoints (such as a service activator), in general, inbound headers are propagated to the outbound message. One exception to this is a transformer, when a complete message is returned to the framework. In that case, the user code is responsible for the entire outbound message. When a transformer just returns the payload, the inbound headers are propagated. Also, a header is only propagated if it does not already exist in the outbound message, letting you change header values as needed.
Starting with version 4.3.10, you can configure message handlers (that modify messages and produce output) to suppress the propagation of specific headers.
To configure the header(s) you do not want to be copied, call the setNotPropagatedHeaders() or addNotPropagatedHeaders() methods on the MessageProducingMessageHandler abstract class.
You can also globally suppress propagation of specific message headers by setting the readOnlyHeaders property in META-INF/spring.integration.properties to a comma-delimited list of headers.
Starting with version 5.0, the setNotPropagatedHeaders() implementation on the AbstractMessageProducingHandler applies simple patterns (xxx*, xxx, *xxx, or xxx*yyy) to allow filtering headers with a common suffix or prefix.
See PatternMatchUtils Javadoc for more information.
When one of the patterns is * (asterisk), no headers are propagated.
All other patterns are ignored.
In that case, the service activator behaves the same way as a transformer and any required headers must be supplied in the Message returned from the service method.
The notPropagatedHeaders() option is available in the ConsumerEndpointSpec for the Java DSL
It is also available for XML configuration of the <service-activator> component as a not-propagated-headers attribute.
7.3. Message Implementations
The base implementation of the Message interface is GenericMessage<T>, and it provides two constructors, shown in the following listing:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
When a Message is created, a random unique ID is generated.
The constructor that accepts a Map of headers copies the provided headers to the newly created Message.
There is also a convenient implementation of Message designed to communicate error conditions.
This implementation takes a Throwable object as its payload, as the following example shows:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
Note that this implementation takes advantage of the fact that the GenericMessage base class is parameterized.
Therefore, as shown in both examples, no casting is necessary when retrieving the Message payload Object.
7.4. The MessageBuilder Helper Class
You may notice that the Message interface defines retrieval methods for its payload and headers but provides no setters.
The reason for this is that a Message cannot be modified after its initial creation.
Therefore, when a Message instance is sent to multiple consumers (for example,
through a publish-subscribe Channel), if one of those consumers needs to send a reply with a different payload type, it must create a new Message.
As a result, the other consumers are not affected by those changes.
Keep in mind that multiple consumers may access the same payload instance or header value, and whether such an instance is itself immutable is a decision left to you.
In other words, the contract for Message instances is similar to that of an unmodifiable Collection, and the MessageHeaders map further exemplifies that.
Even though the MessageHeaders class implements java.util.Map, any attempt to invoke a put operation (or 'remove' or 'clear') on a MessageHeaders instance results in an UnsupportedOperationException.
Rather than requiring the creation and population of a Map to pass into the GenericMessage constructor, Spring Integration does provide a far more convenient way to construct Messages: MessageBuilder.
The MessageBuilder provides two factory methods for creating Message instances from either an existing Message or with a payload Object.
When building from an existing Message, the headers and payload of that Message are copied to the new Message, as the following example shows:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
If you need to create a Message with a new payload but still want to copy the headers from an existing Message, you can use one of the 'copy' methods, as the following example shows:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
Note that the copyHeadersIfAbsent method does not overwrite existing values.
Also, in the preceding example, you can see how to set any user-defined header with setHeader.
Finally, there are set methods available for the predefined headers as well as a non-destructive method for setting any header (MessageHeaders also defines constants for the pre-defined header names).
You can also use MessageBuilder to set the priority of messages, as the following example shows:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
The priority header is considered only when using a PriorityChannel (as described in the next chapter).
It is defined as a java.lang.Integer.
8. Message Routing
This chapter covers the details of using Spring Integration to route messages.
8.1. Routers
This section covers how routers work. It includes the following topics:
8.1.1. Overview
Routers are a crucial element in many messaging architectures. They consume messages from a message channel and forward each consumed message to one or more different message channels depending on a set of conditions.
Spring Integration provides the following routers:
Router implementations share many configuration parameters. However, certain differences exist between routers. Furthermore, the availability of configuration parameters depends on whether routers are used inside or outside of a chain. In order to provide a quick overview, all available attributes are listed in the two following tables .
The following table shows the configuration parameters available for a router outside of a chain:
| Attribute | router | header value router | xpath router | payload type router | recipient list route | exception type router |
|---|---|---|---|---|---|---|
apply-sequence |
|
|
|
|
|
|
default-output-channel |
|
|
|
|
|
|
resolution-required |
|
|
|
|
|
|
ignore-send-failures |
|
|
|
|
|
|
timeout |
|
|
|
|
|
|
id |
|
|
|
|
|
|
auto-startup |
|
|
|
|
|
|
input-channel |
|
|
|
|
|
|
order |
|
|
|
|
|
|
method |
|
|||||
ref |
|
|||||
expression |
|
|||||
header-name |
|
|||||
evaluate-as-string |
|
|||||
xpath-expression-ref |
|
|||||
converter |
|
The following table shows the configuration parameters available for a router inside of a chain:
| Attribute | router | header value router | xpath router | payload type router | recipient list router | exception type router |
|---|---|---|---|---|---|---|
apply-sequence |
|
|
|
|
|
|
default-output-channel |
|
|
|
|
|
|
resolution-required |
|
|
|
|
|
|
ignore-send-failures |
|
|
|
|
|
|
timeout |
|
|
|
|
|
|
id |
||||||
auto-startup |
||||||
input-channel |
||||||
order |
||||||
method |
|
|||||
ref |
|
|||||
expression |
|
|||||
header-name |
|
|||||
evaluate-as-string |
|
|||||
xpath-expression-ref |
|
|||||
converter |
|
|
As of Spring Integration 2.1, router parameters have been more standardized across all router implementations. Consequently, a few minor changes may break older Spring Integration based applications. Since Spring Integration 2.1, the Prior to these changes, the If you do desire to drop messages silently, you can set |
8.1.2. Common Router Parameters
This section describes the parameters common to all router parameters (the parameters with all their boxes ticked in the two tables shown earlier in this chapter).
Inside and Outside of a Chain
The following parameters are valid for all routers inside and outside of chains.
apply-sequence-
This attribute specifies whether sequence number and size headers should be added to each message. This optional attribute defaults to
false. default-output-channel-
If set, this attribute provides a reference to the channel where messages should be sent if channel resolution fails to return any channels. If no default output channel is provided, the router throws an exception. If you would like to silently drop those messages instead, set the default output channel attribute value to
nullChannel.A message is sent only to the default-output-channelifresolution-requiredisfalseand the channel is not resolved. resolution-required-
This attribute specifies whether channel names must always be successfully resolved to channel instances that exist. If set to
true, aMessagingExceptionis raised when the channel cannot be resolved. Setting this attribute tofalsecauses any unresovable channels to be ignored. This optional attribute defaults totrue.A Message is sent only to the default-output-channel, if specified, whenresolution-requiredisfalseand the channel is not resolved. ignore-send-failures-
If set to
true, failures to send to a message channel is ignored. If set tofalse, aMessageDeliveryExceptionis thrown instead, and, if the router resolves more than one channel, any subsequent channels do not receive the message.The exact behavior of this attribute depends on the type of the
Channelto which the messages are sent. For example, when using direct channels (single threaded), send failures can be caused by exceptions thrown by components much further downstream. However, when sending messages to a simple queue channel (asynchronous), the likelihood of an exception to be thrown is rather remote.While most routers route to a single channel, they can return more than one channel name. The recipient-list-router, for instance, does exactly that. If you set this attribute totrueon a router that only routes to a single channel, any caused exception is swallowed, which usually makes little sense. In that case, it would be better to catch the exception in an error flow at the flow entry point. Therefore, setting theignore-send-failuresattribute totrueusually makes more sense when the router implementation returns more than one channel name, because the other channel(s) following the one that fails would still receive the message.This attribute defaults to
false. timeout-
The
timeoutattribute specifies the maximum amount of time in milliseconds to wait when sending messages to the target Message Channels. By default, the send operation blocks indefinitely.
Top-Level (Outside of a Chain)
The following parameters are valid only across all top-level routers that are outside of chains.
id-
Identifies the underlying Spring bean definition, which, in the case of routers, is an instance of
EventDrivenConsumerorPollingConsumer, depending on whether the router’sinput-channelis aSubscribableChannelor aPollableChannel, respectively. This is an optional attribute. auto-startup-
This “lifecycle” attribute signaled whether this component should be started during startup of the application context. This optional attribute defaults to
true. input-channel-
The receiving message channel of this endpoint.
order-
This attribute defines the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel uses a failover dispatching strategy. It has no effect when this endpoint itself is a polling consumer for a channel with a queue.
8.1.3. Router Implementations
Since content-based routing often requires some domain-specific logic, most use cases require Spring Integration’s options for delegating to POJOs by using either the XML namespace support or annotations. Both of these are discussed later. However, we first present a couple of implementations that fulfill common requirements.
PayloadTypeRouter
A PayloadTypeRouter sends messages to the channel defined by payload-type mappings, as the following example shows:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
Configuration of the PayloadTypeRouter is also supported by the namespace provided by Spring Integration (see Namespace Support), which essentially simplifies configuration by combining the <router/> configuration and its corresponding implementation (defined by using a <bean/> element) into a single and more concise configuration element.
The following example shows a PayloadTypeRouter configuration that is equivalent to the one above but uses the namespace support:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
The following example shows the equivalent router configured in Java:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
When using the Java DSL, there are two options.
First, you can define the router object as shown in the preceding example:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlows.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
Note that the router can be, but does not have to be, a @Bean.
The flow registers it if it is not a @Bean.
Second, you can define the routing function within the DSL flow itself, as the following example shows:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlows.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
A HeaderValueRouter sends Messages to the channel based on the individual header value mappings.
When a HeaderValueRouter is created, it is initialized with the name of the header to be evaluated.
The value of the header could be one of two things:
-
An arbitrary value
-
A channel name
If it is an arbitrary value, additional mappings for these header values to channel names are required. Otherwise, no additional configuration is needed.
Spring Integration provides a simple namespace-based XML configuration to configure a HeaderValueRouter.
The following example demonstrates configuration for the HeaderValueRouter when mapping of header values to channels is required:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
During the resolution process, the router defined in the preceding example may encounter channel resolution failures, causing an exception.
If you want to suppress such exceptions and send unresolved messages to the default output channel (identified with the default-output-channel attribute) set resolution-required to false.
Normally, messages for which the header value is not explicitly mapped to a channel are sent to the default-output-channel.
However,