在上一节中,我们讲了三种方式来实现延时任务,其实,将三种方式结合起来用,对于一些中小型公司已经足够了,但是在中大型互联网公司还是远远不够的。
想必大家对Redis起码有一个初步的概念:基于内存的非关系型数据库。在平时的业务开发中,Redis经常会被用做缓存,来提高网站的性能,减少数据库的访问,所以一想到Redis,脑海中第一个浮现出来的就是缓存。
没办法,对于搬砖业务开发,所使用到的Redis基本也仅仅局限于缓存/代替Session了。但是Redis的应用场景很多很多。下面我就用Redis来实现延时任务。
我在这里假设大家已经对Redis有了一个基本的了解,并且使用过Redis,所以我不再过多的讲述Redis的基本知识了。
##开门见山,我们会使用到Redis的ZSet数据结构。
ZSet可以理解为Set的升级版本,Set是无序的,ZSet是有序的,而且会实时排序,所以ZSet也被称为 Sort Set,我是比较倾向于后面的称呼的,因为从字面上的意思就可以知道这个Set拥有一个特点:排序。
ZSet有两个元素:member ,score,ZSet会根据score进行排序,member就是成员的意思。
要想完成这个需求,我们第一个想到的应该就是怎么把数据推送给Redis,第二个想到的是怎么把Redis数据给读取出来。
我所用的是Jedis
通过IDEA的自动提示功能,我们很容易找到zadd这方法,看起来推送数据给Redis应该是这个方法:
让我们试一下吧。private static JedisPoolConfig jedisPoolConfig = null; private static JedisPool jedisPool; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); static { jedisPoolConfig = new JedisPoolConfig(); jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0); } private static void zadd(String key, String member, double score) { Jedis jedis = jedisPool.getResource(); jedis.zadd(key, score, member); jedis.close(); } private static SetzrangeWithScores(String key, int start, int end) { Jedis jedis = jedisPool.getResource(); Set set = jedis.zrangeWithScores(key, start, end); jedis.close(); return set; } private static void zrem(String key, String member) { Jedis jedis = jedisPool.getResource(); jedis.zrem(key, member); jedis.close(); }复制代码
在这里我把ip和端口号,密码,DBNum都给隐藏了 ,大家应该可以看懂。
我并没有用Spring或者是Spring Boot来管理Redis,因为如果加上这些东西,还需要有Spring或者Spring Boot的知识,而且这里我仅仅是想演示下用Redis实现延迟任务的一个思路而已,基于此原因,大家也别纠结 没有分不同的类,没有异常处理 等等。
我们在main方法调用下zadd方法:
zadd("haha","order1",2018005130); zadd("haha","order2",2017005130); zadd("haha","order3",2016005130); zadd("haha","order4",2021115130);复制代码
执行完成后,打开Redis管理工具,找到这个key,看一下结果:
score小的排前面,这也符合orderTime小的,越先过期。但是 score参数类型是double,所以我们不能把Date型的orderTime直接放进去了,但是我们可以把Date经过转换,再放进去。数据已经放进去了,怎么拿出来呢?这个方法比较难找,只能借助于搜索引擎了,最后确定了方法:
不管是参数,还是返回类型,都有点奇怪,难怪找不到。 我们调用下这个方法试一下。 根据参数名称,猜测start是开始,end是结束,我们要读取第一个数据,应该是传0,1,或者是1,2。但是 其实应该是传0,0。。。真让人沮丧。。返回的数据也很奇怪,看不懂啊:这个先放一下,我们发现 这个数据虽然已经被读取出来了,但是Redis并么有把这个数据删掉。
让我们想想,这个ZSet并不像DelayQueue一样,这个没有延迟的功能。我们把数据推到Redis,下一秒就可以读出来。所以我们需要先把数据读取出来,然后判断这个订单有没有超时,如果没有超时的话,Sleep一段时间,再次判断是否超时。超时了,则修改数据库状态,如果还是没超时,继续Sleep,再判断。所以上面读取方法没有删除数据,是符合我们要求的。
我们需要找到删除的方法,最后确定了方法:
这个方法比较简单,就是一个key,一个可变的member。 我们调用下这个方法:zrem("haha", "order3");复制代码
再看下Redis的管理工具:
order3 被删除了。接下来的问题就是读取数据方法那个奇怪的返回值怎么使用了,也没有什么好的办法,就是在运行的时候,ALT+F8 调出窗口,各种尝试呗。
下面,直接把最终代码贴出来吧:
private static final int expireTime = 15000; //其他略,上面有 public static void main(String[] args) { Thread productThread = new Thread(() -> { for (int i = 0; i < 15; i++) { try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); } produce(i); } }); productThread.start(); Thread consumThread = new Thread(() -> { consum(); }); consumThread.start(); } private static void produce(int orderId) { Date date = new Date(); String dateStr = simpleDateFormat.format(date); System.out.printf("现在时间是%s,订单%s加入队列%n", dateStr, orderId); zadd("order", String.valueOf(orderId), date.getTime()); } private static void consum() { while (true) { Setset = zrangeWithScores("order", 0, 0); for (Tuple item : set) { Date date = new Date(); if (date.getTime() - item.getScore() > expireTime) { String dateStr = simpleDateFormat.format(date); System.out.printf("现在时间是%s,订单%s已过期%n", dateStr, item.getElement()); zrem("order", item.getElement()); } else { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } } }复制代码
最后让我们执行一下:
没有问题。在这里有两个细节需要说明下:
-
如果取出来的订单没有超时,会睡300毫秒,然后继续判断。这个数值直接影响着订单过期的延迟情况,如果把数值设置的很大,比如20秒,显而易见,订单真正过期的时间远远不止15秒了。如果把数据设置的很小,那么服务器压力也会增大。
-
取出数据——》删除数据 会有一定的时间差,如果开几个线程同时消费,或者部署几个应用同时消费,会有重复执行的情况,如果仅仅是修改下订单的状态,没什么问题,因为这是一个幂等性操作,不管执行几次,都是同样的结果,但是如何还有其他的非幂等性操作,比如 用户信誉-10,就要用其他的手段来避免 重复执行了,这里就不展开了。
这种实现方式其实也比较简单,因为大家都或多或少的使用过Redis,只要知道这几个方法,很容易就可以实现。 但是相比上一节的三个方法来说,这个方法就高端很多了,支持多线程消费,支持多应用(部署)消费,应用服务器宕机也没事。
好了,到这里,延时任务的第四种方式——使用Redis 就讲完了。