发送 JMS 消息
简单的使用默认destination
1 | import javax.jms.ConnectionFactory; |
Message Converters
Message Converters 提供Java 对象 message‘s 数据间的转换。Spring的默认实现 SimpleMessageConverter 可以支持String 和 TextMessage, byte[] 和 BytesMesssage, java.util.Map 和 MapMessage 之间的转换。
下面是个Map的发送:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public void sendWithConversion() {
Map map = new HashMap();
map.put("Name", "Mark");
map.put("Age", new Integer(47));
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws JMSException {
message.setIntProperty("AccountID", 1234);
message.setJMSCorrelationID("123-00001");
return message;
}
});
}
//This results in a message of the form:
//MapMessage={
// Header={
// ... standard headers ...
// CorrelationID={123-00001}
// }
// Properties={
// AccountID={Integer:1234}
// }
// Fields={
// Name={String:Mark}
// Age={Integer:47}
// }
//}
SessionCallback 回调和 ProducerCallback 回调
接收 JMS 消息
同步接收 Synchronous
同步接收 JMS 消息会堵塞, 可设置 receiveTimeout
。
异步接收 Asynchronous - Message-Driven POJOs
类似于 EJB 里的 Message-Driven Bean (MDB) ,Spring 定义了 Message-Driven POJO (MDP) 来作为 JMS 的接收者。
一个 Message-Driven POJO (MDP) 必须实现 javax.jms.MessageListener
(或者 MessageListenerAdapter, SessionAwareMessageListener),而且必须是线程安全,它会被多线程调用。
MDP的一个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
对应的配置1
2
3
4
5
6
7
8
9<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
---<property name="messageListener" ref="messageListener" />---
</bean>
事务 transaction
本地事务只需要简单配置 sessionTransacted
就可以激活。发送响应是该本地事务的一部分,但其他所有资源(如数据库操作)的操作都是独立的。1
2
3
4
5
6<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
分布式事务1
2
3
4
5
6
7
8<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>
注解驱动的监听端点
@JmsListener
开启监听端点 注解
在@Configuration
类中加入@EnableJms
来使@JmsListener
生效。1
2
3
4
5
6
7
8
9
10
11
12
13
14@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { //default name
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("3-10"); //pool size 3~10 threads
return factory;
}
}
注册监听端点
1 | @Configuration |
端点方法 签名
1 | @Component |
响应管理
@SendTo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
//or
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
运行时响应destination1
2
3
4
5
6
7
8
9@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}