activemq中点对点、订阅/发布以及Camel 模式spring整合
点对点
p2p也可以称作queue,一个sender发送的消息,只能有一个receiver接受。sender发送消息到目标Queue,receiver可以异步接受这个queue上的消息。
queue上的消息如果暂时没有receiver来取,也不会遗失。queue保证每条数据都能被receiver接收。queue数据会默认以文件形式保存。activemq
一般会保存在$AMO_HOME\data\kr-store\data下面。
订阅/发布
订阅/发布 也称作pub/sub。监听同一个topic地址的多个sub都能收到publisher发送的消息。一般来说publisher发布消息到某一个topic时,只有
在监听该topic地址的sub能够接收到消息,如果没有sub监听,该topic会遗失。所以并不能保证publisher发布的每一条数据。subscriber都能接收到。
topic数据默认不落地,是无状态的。
区别
Topic和Queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端接收不到消息。而Queue则是以点对
点的形式通知多个处于监听状态的客户端中的一个。
点对点与spring整合
maven依赖在此省略。
首先编写一个点对点发送消息的bean,然后在spring中进行声明。代码如下:
1 | package com.kongzhong.gw2.ccc.message; |
接下来编写spring整合的配置文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<description>ActiveMq 发送消息配置</description>
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="dispatchAsync" value="true" />
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="false" />
<property name="receiveTimeout" value="20000" />
</bean>
<!--声明点对点发送消息的bean -->
<bean id="queueMessageProducer" class="com.kongzhong.gw2.ccc.message.QueueMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"/>
<!--队列名称-->
<property name="queueName" value="queue_test" />
</bean>
</beans>
接下来是消费者:点对点消息队列监听器,随spring容器启动就开始监听。代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.kongzhong.gw2.ccc.message;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 点对点消息队列监听器
*/
public class QueueMessageListener implements MessageListener {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("接受到的消息为:" + mapMessage.getString("sendId"));
System.out.println("接受到的消息为:" + mapMessage.getString("name"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者spring整合配置文件如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 消息监听器bena声明-->
<bean id="queueReceiver1" class="com.kongzhong.gw2.ccc.message.QueueMessageListener"/>
<jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsFactory" acknowledge="auto">
<!--队列名称-->
<jms:listener destination="queue_test" ref="queueReceiver1"/>
</jms:listener-container>
</beans>
订阅/发布
发布实体bena代码如下:
1 | package com.kongzhong.gw2.ccc.message; |
发布spring整合如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="20000" />
</bean>
<!--发布实体bena声明-->
<bean id="topicMessageProducer" class="com.kongzhong.gw2.ccc.message.TopicMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"/>
<!--topic->
<property name="queueName" value="top_test" />
</bean>
</beans>
订阅者监听器实体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.kongzhong.gw2.ccc.message;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 订阅消息队列监听器
*/
public class TopicMessageListener implements MessageListener {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Message message) {
System.out.println("消费者接受到消息:"+message);
logger.error("TopicMessageListener recv: {}", message);
}
}
订阅者spring整合配置如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<bean id="connectionfactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="idleTimeout" value="600000" /><!-- 600 seconds -->
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
</bean>
<!-- 消息监听器实体bena声明 -->
<bean id="topicReceiver1" class="com.kongzhong.gw2.ccc.message.TopicMessageListener"/>
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionfactory" acknowledge="auto">
<!--topic-->
<jms:listener destination="top_test" ref="topicReceiver1"/>
</jms:listener-container>
</beans>
Camel在activemq中的使用
消费者接受到消息之后,会进行路由分发。这里介绍其中的一种情况,路由分发到spring声明的实体bean中。用Simple language来接受消息体。
生产者和上面是一样的。消费者的spring配置文件如下:
1 | <beans xmlns="http://www.springframework.org/schema/beans" |
实体bean如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.kongzhong.gw2.ccc.message;
import org.apache.camel.language.Simple;
import org.springframework.stereotype.Component;
@Component
public class CamelExample {
public void consumerMsg(@Simple("${body[sendId]}") String sendId,
@Simple("${body[name]}") String name,
@Simple("${body[address]}")String address){
System.out.println("接受到消息了。。。。。");
System.out.println("收到的消息是:"+sendId);
System.out.println("收到的消息是:"+name);
System.out.println(address);
}
}