点对点


p2p也可以称作queue,一个sender发送的消息,只能有一个receiver接受。sender发送消息到目标Queue,receiver可以异步接受这个queue上的消息。
queue上的消息如果暂时没有receiver来取,也不会遗失。queue保证每条数据都能被receiver接收。queue数据会默认以文件形式保存。activemq
一般会保存在$AMO_HOME\data\kr-store\data下面。

订阅/发布


订阅/发布 也称作pub/sub。监听同一个topic地址的多个sub都能收到publisher发送的消息。一般来说publisher发布消息到某一个topic时,只有
在监听该topic地址的sub能够接收到消息,如果没有sub监听,该topic会遗失。所以并不能保证publisher发布的每一条数据。subscriber都能接收到。
topic数据默认不落地,是无状态的。

区别


Topic和Queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端接收不到消息。而Queue则是以点对
点的形式通知多个处于监听状态的客户端中的一个。

点对点与spring整合


maven依赖在此省略。
首先编写一个点对点发送消息的bean,然后在spring中进行声明。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.kongzhong.gw2.ccc.message;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
* 点对点消息发送
*
*/
public class QueueMessageProducer {
protected Logger logger = LoggerFactory.getLogger(getClass());

private JmsTemplate jmsTemplate;
// 队列名称
private String queueName;

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public boolean sendMessage() {

try {
jmsTemplate.send(this.queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setObject("sendId", "123456");
message.setObject("name", "xxg");
message.setObject("address", "China of shanghai");
return message;
}
});
return true;
} catch (JmsException e) {
e.printStackTrace();
}

return false;
}
}

接下来编写spring整合的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">


<description>ActiveMq 发送消息配置</description>

<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="dispatchAsync" value="true" />
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="false" />
<property name="receiveTimeout" value="20000" />
</bean>
<!--声明点对点发送消息的bean -->
<bean id="queueMessageProducer" class="com.kongzhong.gw2.ccc.message.QueueMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"/>
<!--队列名称-->
<property name="queueName" value="queue_test" />
</bean>
</beans>

接下来是消费者:点对点消息队列监听器,随spring容器启动就开始监听。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.kongzhong.gw2.ccc.message;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 点对点消息队列监听器
*/
public class QueueMessageListener implements MessageListener {

protected Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("接受到的消息为:" + mapMessage.getString("sendId"));
System.out.println("接受到的消息为:" + mapMessage.getString("name"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者spring整合配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">


<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>

<!-- 消息监听器bena声明-->
<bean id="queueReceiver1" class="com.kongzhong.gw2.ccc.message.QueueMessageListener"/>

<jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsFactory" acknowledge="auto">
<!--队列名称-->
<jms:listener destination="queue_test" ref="queueReceiver1"/>
</jms:listener-container>
</beans>

订阅/发布


发布实体bena代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.kongzhong.gw2.ccc.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;

/**
* 订阅消息发送
*
*/
public class TopicMessageProducer {

protected Logger logger = LoggerFactory.getLogger(getClass());

private JmsTemplate jmsTemplate;
// topic
private String queueName;

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public boolean publishMessage(Object message) {

try {
jmsTemplate.convertAndSend(queueName, message);
return true;
} catch (JmsException e) {
e.printStackTrace();
logger.error("TopicMessageProducer send exception," + message, e);
}
return false;
}
}

发布spring整合如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">


<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="20000" />
</bean>

<!--发布实体bena声明-->
<bean id="topicMessageProducer" class="com.kongzhong.gw2.ccc.message.TopicMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"/>
<!--topic->
<property name="queueName" value="top_test" />
</bean>
</beans>

订阅者监听器实体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.kongzhong.gw2.ccc.message;

import javax.jms.Message;
import javax.jms.MessageListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 订阅消息队列监听器
*/
public class TopicMessageListener implements MessageListener {

protected Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void onMessage(Message message) {
System.out.println("消费者接受到消息:"+message);
logger.error("TopicMessageListener recv: {}", message);
}

}

订阅者spring整合配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">


<bean id="connectionfactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="idleTimeout" value="600000" /><!-- 600 seconds -->
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
<property name="producerWindowSize" value="10240000" />
<property name="useAsyncSend" value="true" />
<property name="dispatchAsync" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="sendTimeout" value="3000" />
</bean>
</property>
</bean>
<!-- 消息监听器实体bena声明 -->
<bean id="topicReceiver1" class="com.kongzhong.gw2.ccc.message.TopicMessageListener"/>

<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionfactory" acknowledge="auto">
<!--topic-->
<jms:listener destination="top_test" ref="topicReceiver1"/>
</jms:listener-container>
</beans>

Camel在activemq中的使用


消费者接受到消息之后,会进行路由分发。这里介绍其中的一种情况,路由分发到spring声明的实体bean中。用Simple language来接受消息体。
生产者和上面是一样的。消费者的spring配置文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">


<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<!--activemq:queue:queue_test "activemq代表activemq消息系统,queue表示点对点队列 queue_test表示队列名称"-->
<from uri="activemq:queue:queue_test" />
<!--multicast表示这个消息可以分发到不同的bean中去-->
<multicast>
<!--bean代表实体,camelExample代表bena的ID->
<to uri="bean:camelExample"/>
<to uri="bean:camelExampleTwo"/>
</multicast>
</route>
</camelContext>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- 文档地址:http://camel.apache.org/activemq.html http://camel.apache.org/uris.html -->
</beans>

实体bean如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kongzhong.gw2.ccc.message;

import org.apache.camel.language.Simple;
import org.springframework.stereotype.Component;

@Component
public class CamelExample {

public void consumerMsg(@Simple("${body[sendId]}") String sendId,
@Simple("${body[name]}") String name,
@Simple("${body[address]}")String address){
System.out.println("接受到消息了。。。。。");
System.out.println("收到的消息是:"+sendId);
System.out.println("收到的消息是:"+name);
System.out.println(address);
}
}