本篇文章小编给大家分享一下redis实现队列的阻塞、延时、发布和订阅代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。
普通队列
可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。
lpush+rpop:左进右出的队列
rpush+lpop:左出右进的队列
下面使用redis的命令来模拟普通队列。
使用lpush命令生产消息:
>lpush queue:single 1 "1" >lpush queue:single 2 "2" >lpush queue:single 3 "3"
使用rpop命令消费消息:
>rpop queue:single "1" >rpop queue:single "2" >rpop queue:single "3"
下面使用Java代码来实现普通队列。
生产者SingleProducer
package com.morris.redis.demo.queue.single; import redis.clients.jedis.Jedis; /** * 生产者 */ public class SingleProducer { public static final String SINGLE_QUEUE_NAME = "queue:single"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i < 100; i++) { jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i); } jedis.close(); } }
消费者SingleConsumer:
package com.morris.redis.demo.queue.single; import redis.clients.jedis.Jedis; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 消费者 */ public class SingleConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME); if(Objects.nonNull(message)) { System.out.println(message); } else { TimeUnit.MILLISECONDS.sleep(500); } } } }
上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:
消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。
如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
阻塞队列
消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,
使用redis的brpop命令来模拟阻塞队列。
>brpop queue:single 30
可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。
Java代码实现如下:
生产者与普通队列的生产者一致。
消费者BlockConsumer:
package com.morris.redis.demo.queue.block; import redis.clients.jedis.Jedis; import java.util.List; /** * 消费者 */ public class BlockConsumer { public static void main(String[] args) { Jedis jedis = new Jedis(); while (true) { // 超时时间为1s ListmessageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME); if (null != messageList && !messageList.isEmpty()) { System.out.println(messageList); } } } }
缺点:无法实现一次生产多次消费。
发布订阅模式
Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。
发布:PUBLISH指令可用于发布一条消息,格式:
PUBLISH channel message
返回值表示订阅了该消息的数量。
订阅:SUBSCRIBE指令用于接收一条消息,格式:
SUBSCRIBE channel
使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。
回复分为三种类型:
如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量
如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
下面使用redis的命令来模拟发布订阅模式。
生产者:
127.0.0.1:6379> publish queue hello (integer) 1 127.0.0.1:6379> publish queue hi (integer) 1
消费者:
127.0.0.1:6379> subscribe queue Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "queue" 3) (integer) 1 1) "message" 2) "queue" 3) "hello" 1) "message" 2) "queue" 3) "hi"
Java代码实现如下:
生产者PubsubProducer:
package com.morris.redis.demo.queue.pubsub; import redis.clients.jedis.Jedis; /** * 生产者 */ public class PubsubProducer { public static final String PUBSUB_QUEUE_NAME = "queue:pubsub"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i < 100; i++) { jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i); } jedis.close(); } }
消费者PubsubConsumer:
package com.morris.redis.demo.queue.pubsub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; /** * 消费者 */ public class PubsubConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println("receive message: " + message); if(message.indexOf("99") > -1) { this.unsubscribe(); } } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("subscribe channel: " + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("unsubscribe channel " + channel); } }; jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME); } }
消费者可以启动多个,每个消费者都能收到所有的消息。
可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:
psubscribe channel.*
用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。
同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE *不会匹配到channel.*,所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。
Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。
延时队列和优先级队列
Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。
如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。
如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。
下面使用redis的zset来模拟延时队列。
生产者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3 (integer) 0
消费者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores 1) "order1" 2) "1" 127.0.0.1:6379> zrem queue:delay order1 (integer) 1
Java代码如下:
生产者DelayProducer:
package com.morris.redis.demo.queue.delay; import redis.clients.jedis.Jedis; import java.util.Date; import java.util.Random; /** * 生产者 */ public class DelayProducer { public static final String DELAY_QUEUE_NAME = "queue:delay"; public static void main(String[] args) { Jedis jedis = new Jedis(); long now = new Date().getTime(); Random random = new Random(); for (int i = 0; i < 10; i++) { int second = random.nextInt(30); // 随机订单失效时间 jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i); } jedis.close(); } }
消费者:
package com.morris.redis.demo.queue.delay; import redis.clients.jedis.Jedis; import redis.clients.jedis.Tuple; import java.util.Date; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; /** * 消费者 */ public class DelayConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { long now = new Date().getTime(); SettupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0); if(tupleSet.isEmpty()) { TimeUnit.MILLISECONDS.sleep(500); } else { for (Tuple tuple : tupleSet) { Double score = tuple.getScore(); long time = score.longValue(); if(time < now) { jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement()); System.out.println("order[" + tuple.getElement() +"] is timeout at " + time); } else { TimeUnit.MILLISECONDS.sleep(500); } break; } } } } }
应用场景
延时队列可用于订单超时失效的场景
二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。
忍者必须死34399账号登录版 最新版v1.0.138v2.0.72
下载勇者秘境oppo版 安卓版v1.0.5
下载忍者必须死3一加版 最新版v1.0.138v2.0.72
下载绝世仙王官方正版 最新安卓版v1.0.49
下载Goat Simulator 3手机版 安卓版v1.0.8.2
Goat Simulator 3手机版是一个非常有趣的模拟游
Goat Simulator 3国际服 安卓版v1.0.8.2
Goat Simulator 3国际版是一个非常有趣的山羊模
烟花燃放模拟器中文版 2025最新版v1.0
烟花燃放模拟器是款仿真的烟花绽放模拟器类型单机小游戏,全方位
我的世界动漫世界 手机版v友y整合
我的世界动漫世界模组整合包是一款加入了动漫元素的素材整合包,
我的世界贝爷生存整合包 最新版v隔壁老王
我的世界MITE贝爷生存整合包是一款根据原版MC制作的魔改整