【赵强老师】Redis的消息发布与订阅
Redis 作为一个publish/subscribe server,起到了消息路由的功能。订阅者可以通过subscribe和psubscribe命令向Redis server订阅自己感兴趣的消息类型,当发布者通过publish命令向Redis server发送特定类型的消息时。订阅该消息类型的全部client都会收到此消息。这里消息的传递是多对多的。一个client可以订阅多个channel,也可以向多个channel发送消息。
下图为大家展示了Redis消息机制的体系架构。
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
下面列出来了Redis发布消息、订阅消息的相关命令。
publish:发送消息:Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。subscribe:订阅某个频道:Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。psubscribe:模式匹配:模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用PSUBSCRIBE订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。
案例一:一个消息生产者,两个消息消费者
案例二:两个消息生产者,一个消息消费者
案例三:Redis消息机制的Java API
添加依赖:
消息监听器类:
importredis.clients.jedis.JedisPubSub;publicclassRedisMsgPubSubListenerextendsJedisPubSub{@Overridepublicvoidunsubscribe(){super.unsubscribe();}@Overridepublicvoidunsubscribe(String...channels){super.unsubscribe(channels);}@Overridepublicvoidsubscribe(String...channels){super.subscribe(channels);}@Overridepublicvoidpsubscribe(String...patterns){super.psubscribe(patterns);}@Overridepublicvoidpunsubscribe(){super.punsubscribe();}@Overridepublicvoidpunsubscribe(String...patterns){super.punsubscribe(patterns);}@OverridepublicvoidonMessage(Stringchannel,Stringmessage){System.out.println("channel:"+channel+"receivesmessage:"+message);this.unsubscribe();}@OverridepublicvoidonPMessage(Stringpattern,Stringchannel,Stringmessage){}@OverridepublicvoidonSubscribe(Stringchannel,intsubscribedChannels){System.out.println("channel:"+channel+"isbeensubscribed:"+subscribedChannels);}@OverridepublicvoidonPUnsubscribe(Stringpattern,intsubscribedChannels){}@OverridepublicvoidonPSubscribe(Stringpattern,intsubscribedChannels){}@OverridepublicvoidonUnsubscribe(Stringchannel,intsubscribedChannels){System.out.println("channel:"+channel+"isbeenunsubscribed:"+subscribedChannels);}}
测试程序:
packagedemo.redis;importredis.clients.jedis.Jedis;publicclassTestMain{@TestpublicvoidtestSubscribe()throwsException{Jedisjedis=newJedis("localhost");RedisMsgPubSubListenerlistener=newRedisMsgPubSubListener();jedis.subscribe(listener,"redisChatTest");//othercode}@TestpublicvoidtestPublish()throwsException{Jedisjedis=newJedis("localhost");jedis.publish("redisChatTest","HelloWorld");Thread.sleep(5000);jedis.publish("redisChatTest","HelloRedis");}}
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。