Java Message Service (JMS) : Université Joseph Fourier (Grenoble 1) Polytech Grenoble Lig Erods
Java Message Service (JMS) : Université Joseph Fourier (Grenoble 1) Polytech Grenoble Lig Erods
fr/donsez
Didier DONSEZ
Université Joseph Fourier (Grenoble 1)
PolyTech Grenoble LIG ERODS
[Link]@[Link]
[Link]@[Link]
03/05/2015 MOM & JMS, Didier Donsez, 1998-2012 1
03/05/2015
Motivation
Messaging Oriented Middleware
messagerie inter-applicative
l ’envoi et la réception des messages entre applications est asynchrone
les messages sont structurés et correspondent à des événements, des requêtes,
des rapports, ...
ne nécessite pas de connexion permanente comme pour le Cl/Sv
Publication-Souscription (Publish-Subscribe)
2
JMS targets enterprise messaging; the API is chosen to abstract the programming of a wide
variety of message-oriented-middleware (MOM) products in a vendor neutral and portable
manner, using the Java programming language. A Destination refers to a named physical
resource managed by the underlying MOM. It is administered and configured via vendor
provided tools, and typically accessed by a user application via the Java Naming and Directory
Interface (JNDI) APIs (external to JMS). A MessageProducer will send messages to a destination
and a MessageConsumer can receive messages from a destination. The destination can be
thought of a mini-message broker or a channel independent of the producers and consumers.
A Connection is a heavy-weight object representing the link between the application and the
middleware. Its attributes include a clientID. It provides methods to start() and stop()
communication and to close() a connection. MessageProducer is used to produce messages. A
default destination may be specified when the producer is created; it can also be specified when
sending messages. In addition, the delivery mode, priority, and expiration can be specified for
the outgoing message headers.
A persistent delivery mode means that a message will be delivered once-and-only-once; the
message is stored in permanent storage before the send() method returns. A non-persistent
delivery mode means that the message will be delivered at most once; a message may be
dropped if the JMS provider fails.
Static destinations are discovered via JNDI APIs, which bind logical destination names to
destination objects. The static destinations accessible this way must have been previously
configured in the JMS middleware (server) using vendor supplied administrative tool. Since JMS
discovery is administered, the static destinations must be determined and configured before a
client can use them.
The following is a list of JMS providers: • WebSphere Application Server from IBM,
• Apache ActiveMQ which provides an inbuilt default messaging provider
known as the Service Integration Bus (SIBus), or which
• Redis pub/sub
can connect to WebSphere MQ as a JMS
• Apache Qpid, using AMQP[5]
provider[6]
• Oracle Weblogic (part of the Fusion
• WebSphere MQ (formerly MQSeries) from
Middleware suite) and Oracle AQ from
IBM
Oracle
• FioranoMQ
• EMS from TIBCO
• FFMQ, GNU LGPL licensed
etc
• JBoss Messaging and HornetQ from JBoss
Generally, all Java Enterperise Edition servers
• JORAM, from the OW2 Consortium
offer a JMS provider
• Open Message Queue, from Oracle
• OpenJMS, from The OpenJMS Group
• Solace JMS from Solace Systems
• RabbitMQ by Rabbit Technologies Ltd.,
acquired by SpringSource
• SAP Process Integration ESB
• SonicMQ from Progress Software
• SwiftMQ
• Tervela
• Ultra Messaging from 29 West (acquired
by Informatica)
• webMethods from Software AG
03/05/2015
Architecture JMS
le client utilise les classes du package [Link] pour l ’envoi et la
réception de messages
le serveur implante un JMS Service Provider qu ’interface à son noyau
Message Type
Message Repositories Repository
X Messaging Server (or Broker)
5
03/05/2015
Modèle Point à Point
Point-to-Point
Concept de Queue
Un (ou plusieurs) Sender envoie (produit) un message à une
Queue (i.e. file d ’attente de messages)
Chaque message de la Queue reçu que par un seul
Sender
(Message) Queue
Receiver
Sender
6
03/05/2015
Modèle Publication-Souscription
Publish-Subscribe
Concept de Topic (centre d intérêt)
Un (ou plusieurs) publishers envoie un message à un Topic
Tous les subscribers de ce Topic reçoivent le message
Messaging Server
Publisher Subscriber
MOM & JMS, Didier Donsez, 1998-2012
Publisher Topic Subscriber
Publisher Subscriber
7
JMS API Programming Model
Connection
Factory
creates
Connection
API JMS
Superclasses communes à PTP et PubSub
JMS Parent Modèle PTP Modèle Pub/Sub
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
un objet administré par le provider pour créer une Connection
10
03/05/2015
Les objets administrés par le
Provider
Les objets administrés par le Provider
ConnectionFactory
point d ’accès à un serveur MOM
accessible par JNDI et enregistré par l ’administrateur du serveur MOM
Destination
Queue ou Topic administré par le serveur MOM
accessible par JNDI et enregistré par l ’administrateur du serveur MOM
Les objets
Connection
encapsule la liaison TCP/IP avec le Provider JMS
MOM & JMS, Didier Donsez, 1998-2012
Session
11
03/05/2015
[Link]
Rôle
encapsule la liaison TCP/IP avec le Provider JMS
authentifie le client et spécifie un identifiant unique de client
Cycle de vie
initialisation/setup
crée les Sessions, MessageProducers et les MessageConsumers.
fournit le ConnectionMetaData
démarrage start()
procède à la livraison des Messages
pause stop()
MOM & JMS, Didier Donsez, 1998-2012
reprise start()
fermeture close()
12
03/05/2015
[Link]
Rôle
contexte mono-threadé
fabrique les MessageProducers et les MessageConsumers
13
03/05/2015
14
03/05/2015
[Link]
représente l ’objet consommateur des messages
fabriqué par la Session
QueueReceiver [Link](Queue queue, String messageSelector)
TopicSubscriber [Link](Topic topic, String messageSelector, boolean)
[Link]
représente l ’objet producteur des messages
fabriqué par la Session
QueueSender [Link](Queue queue)
TopicPublisher [Link](Topic topic)
17
03/05/2015
Le modèle Point à Point
Point-To-Point PTP
Classes et interface
JMS Parent Modèle PTP
ConnectionFactory QueueConnectionFactory
Connection QueueConnection
Destination Queue, TemporaryQueue
Session QueueSession
MessageProducer QueueSender
MessageConsumer QueueReceiver
-- QueueBrowser
Remarques
plusieurs sessions peuvent se partager une même queue : JMS ne
spécifie pas comment le provider répartit les messages entre les
MOM & JMS, Didier Donsez, 1998-2012
Remarque
MOM & JMS, Didier Donsez, 1998-2012
19
03/05/2015
Le modèle de Messages JMS
L ’interface [Link]
Motivation
Modèle commun à tous les produits
Supporte l’hétérogénéïté des clients (non JMS, langage, plate-forme, ...)
Supporte les objets Java et les documents XML
L’interface [Link]
sous-interfaces: BytesMessage, MapMessage, ObjectMessage, StreamMessage, TextMessage
Structure d ’un Message
Entête (header)
champs communs aux Providers (Produit) et obligatoires
destinés au routage et l ’identification du message
MOM & JMS, Didier Donsez, 1998-2012
Propriété (property)
champs supplémentaires
propriétés standards : équivalent aux champs d ’entête optionnels
Corps (body)
contient les données applicatives
20
03/05/2015
Le modèle de Messages JMS
Les champs d ’entête
Nom des champs d ’entête
JMSDestination : destination du message
JMSDeliveryMode : sûreté de distribution
NON_PERSISTENT : faible
PERSISTENT : garantie supplémentaire qu ’un message ne soit pas perdu
JMSExpiration : calculé à partir du TTL (Time To Live) depuis la prise en charge
JMSMessageID : identifiant du Message founi par le provider
JMSTimestamp : date de prise en charge du message par le provider
JMSCorrelationID : identifiant d ’un lien avec un autre Message (Request/Reply)
JMSReplyTo : identifiant de la Destination pour les réponses
MOM & JMS, Didier Donsez, 1998-2012
22
03/05/2015
Le modèle de Messages JMS
Les champs de propriétés
Enumération des propriétés
Enumeration getPropertyNames() / Enumeration [Link]()
énumère les propriétés (JMSX) du message
boolean propertyExists(String) teste l ’existence d ’une propriété
[Link]
contient un objet Java sérialisé : Object getObject()/ setObject(Object)
[Link]
contient un tableau de bytes (non interprétés)
MOM & JMS, Didier Donsez, 1998-2012
[Link]
contient un flot de données (Java primitives) qui s ’écrit et se lit séquentiellement
int readInt()/writeInt(int), …, String readString()/writeString(String)
25
03/05/2015
Le modèle de Messages JMS
Le corps (body) du message
Méthodes
la méthode clearBody() réinitialise le corps
Remarque
le corps d ’un message reçu est en lecture seule
MOM & JMS, Didier Donsez, 1998-2012
26
03/05/2015
Le modèle de Messages JMS
Le corps (body) du message
String textstr = "<?xml version=\"1.0\"?><!DOCTYPE person SYSTEM \"[Link]\">"
+"<person><firstname>Joe</firstname><lastname>Smith</lastname>"
+"<salary value=\"10000.0\"><age value=\"38\"></person>";
// l ’API [Link] est préférable pour construire le document XML
TextMessage textmsg = [Link]();
[Link](textstr);
27
03/05/2015
Le modèle de Messages JMS
Le corps (body) du message
TextMessage textmsg = (TextMessage)receivedmsg;
String textstr=[Link](textstr); [Link](textstr);
// le document XML peut être parsé
Byte[] bytesarray ;
int length=[Link](bytesarray);
long age= [Link](); double salary= [Link]();
Expressions de sélection
sous-ensemble de SQL-92
identifiants, littéraux (Chaînes, Numériques, TRUE/FALSE)
connecteurs OR, AND, NOT, ( )
MOM & JMS, Didier Donsez, 1998-2012
>, <, >=, <=, <>, =, BETWEEN, LIKE, IN, IS [NOT] NULL
Exemple
String selector="JMSType='car' AND ( color='blue' OR color='red' ) AND price IS NOT NULL";
QueueReceiver receiver= [Link](queue, selector);
29
03/05/2015
Exemple : Point à Point
La partie commune
class JMSQueueTest {
static Context messaging; static QueueConnectionFactory queueConnectionFactory; static Queue queue;
static QueueConnection queueConnection; static QueueSession queueSession;
static QueueSender queueSender; static QueueReceiver queueReceiver;
static void setup(String queueConnectionFactoryName, String queueName){
messaging = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory)[Link](queueConnectionFactoryName);
queue = (Queue) [Link](queueName);
queueConnection = [Link]();
queueSession = [Link](false,Session.AUTO_ACKNOWLEDGE);
}
static void setupSender(String queueConnectionFactoryName, String queueName){
setup(queueConnectionFactoryName, queueName);
queueSender = [Link](queue);
MOM & JMS, Didier Donsez, 1998-2012
[Link]();
}
static void setupReceiver(String queueConnectionFactoryName, String queueName){
setup(queueConnectionFactoryName, queueName);
queueReceiver = [Link](queue);
[Link]();
}}
30
03/05/2015
Exemple : Point à Point
Sender
// java JMSQueueSenderTest testServer testQueue Hello 10.0 10
public class JMSQueueSenderTest extend JMSQueueTest {
while(--cpt>0) {
send(argv[2]+cpt,[Link](agv[3]).valueDouble()+cpt,cpt, false);
}
send(argv[2]+cpt,agv[3]+cpt,cpt, true);
close();
}}
31
03/05/2015
Exemple : Point à Point
Receiver synchrone
// java JMSQueueSyncReceiverTest testServer testQueue
public class JMSQueueSyncReceiverTest extend JMSQueueTest {
static boolean exit=false;
static void handleMessage(MapMessage message) {
String stringValue=[Link]("stringValue"); [Link](" stringValue="+stringValue);
double doubleValue=[Link]("doubleValue"); [Link](" doubleValue="+doubleValue);
long longValue=((Long)[Link]("longValue")).longValue(); [Link](" longValue="+longValue);
exit = [Link]("exit");
}
static void syncReceive() {
MapMessage message;
while(!exit) {
message= (MapMessage)[Link]();
handleMessage(message);
}
}
MOM & JMS, Didier Donsez, 1998-2012
32
03/05/2015
Exemple : Point à Point
Receiver asynchrone
// java JMSQueueAsyncReceiverTest testServer testQueue public class MyListener implements
[Link] {
void onMessage(Message message) { [Link]((MapMessage)message); }
}
public class JMSQueueAsyncReceiverTest extend JMSQueueTest {
static boolean exit=false;
static void handleMessage(MapMessage message) {
String stringValue=[Link]("stringValue"); [Link](" stringValue="+stringValue);
double doubleValue=[Link]("doubleValue"); [Link](" doubleValue="+doubleValue);
long longValue=((Long)[Link]("longValue")).longValue(); [Link](" longValue="+longValue);
exit = [Link]("exit");
}
static void asyncReceive() {
[Link](new MyListener());
while(!exit) { /* doSomething */ }
}
MOM & JMS, Didier Donsez, 1998-2012
33
03/05/2015
Exemple : Publication Souscription
La partie commune
class JMSTopicTest {
static Context messaging; static TopicConnectionFactory topicConnectionFactory; static Topic topic;
static TopicConnection topicConnection; static TopicSession topicSession;
static TopicPublisher topicPublisher ; static TopicSubscriber topicSubscriber ;
static void setup(String topicConnectionFactoryName, String topicName){
messaging = new InitialContext();
topicConnectionFactory = (TopicConnectionFactory)[Link](topicConnectionFactoryName);
topic = (Topic) [Link](topicName);
topicConnection = [Link]();
topicSession = [Link](false,Session.AUTO_ACKNOWLEDGE);
}
static void setupPublisher(String topicConnectionFactoryName, String topicName){
setup(topicConnectionFactoryName, topicName);
topicPublisher = [Link](topic);
[Link]();
MOM & JMS, Didier Donsez, 1998-2012
}
static void setupSubscriber(String topicConnectionFactoryName, String topicName){
setup(topicConnectionFactoryName, topicName);
topicSubscriber = [Link](topic);
[Link]();
}}
35
03/05/2015
Exemple : Publication Souscription
Publisher
// java JMSTopicPublisherTest testServer testTopic Hello 10.0 10
public class JMSTopicPublisherTest extend JMSTopicTest {
while(--cpt>0) {
publish(argv[2]+cpt,[Link](agv[3]).valueDouble()+cpt,cpt, false);
}
publish(argv[2]+cpt,agv[3]+cpt,cpt, true);
close();
}}
36
03/05/2015
Exemple : Publication Souscription
Subscriber synchrone
// java JMSTopicSyncSubscriberTest testServer testTopic
public class JMSTopicSyncSubscriberTest extend JMSTopicTest {
static boolean exit=false;
static void handleMessage(MapMessage message) {
String stringValue=[Link]("stringValue"); [Link](" stringValue="+stringValue);
double doubleValue=[Link]("doubleValue"); [Link](" doubleValue="+doubleValue);
long longValue=((Long)[Link]("longValue")).longValue(); [Link](" longValue="+longValue);
exit = [Link]("exit");
}
static void syncSubscribe() {
MapMessage message;
while(!exit) {
message= (MapMessage)[Link]();
handleMessage(message);
}
}
MOM & JMS, Didier Donsez, 1998-2012
37
03/05/2015
Exemple : Publication Souscription
Subscriber asynchrone
// java JMSTopicAsyncSubscriberTest testServer testTopic
public class MyListener implements [Link] {
void onMessage(Message message) { [Link]((MapMessage)message); }
}
public class JMSTopicAsyncSubscriberTest extend JMSTopicTest {
static boolean exit=false;
static void handleMessage(MapMessage message) {
String stringValue=[Link]("stringValue"); [Link](" stringValue="+stringValue);
double doubleValue=[Link]("doubleValue"); [Link](" doubleValue="+doubleValue);
long longValue=((Long)[Link]("longValue")).longValue(); [Link](" longValue="+longValue);
exit = [Link]("exit");
}
static void asyncSubscribe() {
[Link](new MyListener());
while(!exit) { /* doSomething */ }
MOM & JMS, Didier Donsez, 1998-2012
}
public static void main(String args[]) {
setupSubscriber(argv[0],argv[1]);
asyncSubscriber();
close();
}}
38
03/05/2015
le JMS Provider doit garantir la livraison « une et une seule fois »
transaction
API
JMS Root PTP Interface Pub/Sub Interface
MOM & JMS, Didier Donsez, 1998-2012
ServerSessionPool
ServerSession
ConnectionConsumer
XAConnectionFactory XAQueueConnectionFactory XATopicConnectionFactory
XAConnection XAQueueConnection XATopicConnection
XASession XAQueueSession XATopicSession
39
03/05/2015
Modèle Requête-Réponse (i)
Request-Reply
Principe
Client-Serveur en mode Asynchrone
mais nombreux styles de Requête-Réponse
serveurs (respondents)
...
Messaging Server
(Reply for A)
reply XA
Client A Temporary Queue reply XA
request A
Request Queue request A
Server X
request B
request B
MOM & JMS, Didier Donsez, 1998-2012
Question :
A quoi peut servir le partage de la Queue par plusieurs serveurs ?
41
03/05/2015
Messaging Server
(Reply for A)
Client A Temporary Queue
request A
Request Queue
MOM & JMS, Didier Donsez, 1998-2012
42
03/05/2015
Messaging Server
(Reply for A)
reply XA
Client A Temporary Queue reply XA
Request Queue request A Server X
MOM & JMS, Didier Donsez, 1998-2012
43
03/05/2015
Messaging Server
Client A
request B
request B
Client B (Reply for B)
reply XB Temporary Queue reply XB
44
03/05/2015
Messaging Server
Client A
Temporary Queue
(Publisher) reply Y (one per client)
reply Y
reply X reply X
Server X
request A (Subscriber)
MOM & JMS, Didier Donsez, 1998-2012
Topic
Server Y
request A request A (Subscriber)
Remarque :
le serveur Z n ’est pas obligé de fournir une réponse
Server Z
request A (Subscriber)
45
03/05/2015
Cycle de vie
création avec le constructeur
QueueRequestor(QueueSession session, Queue queue)
TopicRequestor(TopicSession session, Topic topic)
envoi d ’une requête et attente de la réponse
Remarque
la Session ne peut être transactionnelle
46
03/05/2015
peut être détruit (delete()) après usage pour libérer des ressources
JMSReplyTo)
TemporaryQueue
objet unique, sous interface de Queue
fabriquée par [Link]()
47
03/05/2015
Sécurité
JMS ne contrôle pas la confidentialité et l ’intégrité des messages
MOM & JMS, Didier Donsez, 1998-2012
Protocole de transport
Déploiement
49
03/05/2015
50