Sam's Notes | Sam Blog

梦想还是要有的,万一实现了呢

0%

Spring JMS (二)

发送 JMS 消息

简单的使用默认destination

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

private JmsTemplate jmsTemplate;
private Queue queue;

public void setConnectionFactory(ConnectionFactory cf) {
this.jmsTemplate = new JmsTemplate(cf);
}

public void setQueue(Queue queue) {
this.queue = queue;
}

public void simpleSend() {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello queue world");
}
});
}
}

Message Converters

Message Converters 提供Java 对象 message‘s 数据间的转换。Spring的默认实现 SimpleMessageConverter 可以支持String 和 TextMessage, byte[] 和 BytesMesssage, java.util.Map 和 MapMessage 之间的转换。
下面是个Map的发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void sendWithConversion() {
Map map = new HashMap();
map.put("Name", "Mark");
map.put("Age", new Integer(47));
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws JMSException {
message.setIntProperty("AccountID", 1234);
message.setJMSCorrelationID("123-00001");
return message;
}
});
}

//This results in a message of the form:
//MapMessage={
// Header={
// ... standard headers ...
// CorrelationID={123-00001}
// }
// Properties={
// AccountID={Integer:1234}
// }
// Fields={
// Name={String:Mark}
// Age={Integer:47}
// }
//}

SessionCallback 回调和 ProducerCallback 回调

接收 JMS 消息

同步接收 Synchronous

同步接收 JMS 消息会堵塞, 可设置 receiveTimeout

异步接收 Asynchronous - Message-Driven POJOs

类似于 EJB 里的 Message-Driven Bean (MDB) ,Spring 定义了 Message-Driven POJO (MDP) 来作为 JMS 的接收者。
一个 Message-Driven POJO (MDP) 必须实现 javax.jms.MessageListener (或者 MessageListenerAdapter, SessionAwareMessageListener),而且必须是线程安全,它会被多线程调用。
MDP的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}

}

对应的配置

1
2
3
4
5
6
7
8
9
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
---<property name="messageListener" ref="messageListener" />---
</bean>

事务 transaction

本地事务只需要简单配置 sessionTransacted 就可以激活。发送响应是该本地事务的一部分,但其他所有资源(如数据库操作)的操作都是独立的。

1
2
3
4
5
6
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>

分布式事务

1
2
3
4
5
6
7
8
9
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>

注解驱动的监听端点

@JmsListener

开启监听端点 注解

@Configuration类中加入@EnableJms来使@JmsListener生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableJms
public class AppConfig {

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { //default name
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("3-10"); //pool size 3~10 threads
return factory;
}
}

注册监听端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}

端点方法 签名

1
2
3
4
5
6
7
8
@Component
public class MyService {

@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}

响应管理

@SendTo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}


//or

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}

运行时响应destination

1
2
3
4
5
6
7
8
9
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}

JMS 命名空间

Spring 集成 ActiveMq