redis作为轻量级MQ来使用

发布/订阅模式


redis提供了rabitmq类似的发布订阅模式,通过生产者使用下面的命令来发布消息,
1
PUBLISH CHANNEL MESSAGE

消费者通过下面的消息来订阅消息,

1
SUBSCRIBE CHANNEL MESSAGE

发布者


1
2
3
4
5
#向channel.test发布消息
127.0.0.1:6379> publish channel.test hello
(integer) 0 #返回0表明订阅者为0,没有发布消息
127.0.0.1:6379> publish channel.test hello
(integer) 1 #返回n表明订阅者为n,成功发布给1个消费者

订阅者


1
2
3
4
5
6
7
8
9
10
11
#订阅channel.test消息
127.0.0.1:6379> subscribe channel.test
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel.test"
3) (integer) 1

#接收到来自channel.test的消息
1) "message"
2) "channel.test"
3) "hello"

Java实现

消息发布者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import redis.clients.jedis.Jedis;
/**
* 消息发布者
* @author Mobim-Client
*
*/
public class RedisMqProductor {

public static void main(String[] args) {
Jedis jedis=new Jedis("192.168.189.128", 6379); //连接redis
try {

jedis.publish("channel.test", "你好,中国");
} catch (Exception e) {
e.printStackTrace();
}finally{
jedis.close();
}
}
}

消息订阅者


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RedisMqCustomer {

public static void main(String[] args) {
Jedis jedis=new Jedis("192.168.189.128", 6379); //连接redis
try {

Lister lister=new Lister();

jedis.subscribe(lister, "channel.test");


} catch (Exception e) {
e.printStackTrace();
}finally{
jedis.close();
}
}
}
class Lister extends JedisPubSub{
//取得消息时候的处理
@Override
public void onMessage(String channel, String message) {
System.out.println("频道:"+channel+",消息内容:"+message);
}

// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
// System.out.println(channel + "=" + subscribedChannels);
}

// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
// System.out.println(channel + "=" + subscribedChannels);
}

// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}

// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}

// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
}

先启动消息订阅者。然后再发布消息。