redis实现延时消息队列,redisson延迟队列

redis实现延时消息队列,redisson延迟队列,redis实现延时队列的两种方式(小结)

摘要:本文主要介绍了redis实现延迟队列的两种方法(总结),并通过示例代码进行了详细介绍,对大家的学习或工作有一定的参考价值。下面让我们跟随边肖一起学习。

背景

项目的过程监控有几种节点,需要监控每个节点是否超时。按照传统的做法,肯定是通过定时任务,扫描然后判断,但是定时任务有缺点:1、数据量会比较慢;2,时间不好控制,太短了,怕一次完不了,太长了会有延迟。所以我想到了用延迟队列来实现。

一是redis的过期重点监控

1.打开过期密钥监控。

从redis配置中删除此注释。

通知-密钥空间-事件Ex

然后重启redis。

2.利用redis过期监听实现延迟队列

继承keyepirationeventmessagelistener类并实现父类的方法,可以监视密钥过期时间。当一个密钥过期时,它将在这里执行。在这里,过滤出需要的键,发送到kafka队列。

@组件

@Slf4j

公共类RedisKeyExpirationListener扩展keyepirationeventmessagelistener {

@自动连线

私立KafkaProducerService;

public rediskeexpirationlistener(RedisMessageListenerContainer listenerContainer){

super(listener container);

}

/**

*对redis数据失败事件的数据处理。

* @param消息

* @param模式

*/

@覆盖

public void on Message(Message Message,byte[] pattern){

if(message==null | | string utils . isempty(message . tostring())){

返回;

}

字符串内容=message . tostring();

//key的格式为flag: aging type:运单号示例如下

尝试{

if(content . starts with(ABN constant。EMS)){

kafkaproducerservice . sendmessage sync(topic constant。EMS _运单_ ABN _队列,内容);

} else if(content . starts with(ABN constant。韵达)){

kafkaproducerservice . sendmessage sync(topic constant。韵达_运单_ ABN _队列,内容);

}

} catch(异常e) {

Log.error(监控过期密钥,发送kafka异常,,e);

}

}

}

可以看到,这个方法其实很简单,但是有几个问题需要注意。首先,尽量在单台机器上运行这个,因为多台机器会执行它,浪费cpu,增加数据库的负担。第二,机器频繁部署时,如果有时间间隔,会有数据泄露处理。

其次,redis zset实现了延迟队列

1、生产者实现

可以看出制作人很简单,其实就是利用zset的特性给一个zset添加元素,时间就是它的分数。

public void produce(Integer taskId,long exeTime) {

system . out . println( Join the task,taskid: taskid ,exercise: exercise ,当前时间: local datetime . now());

RedisOps.getJedis()。zadd(RedisOps.key,exeTime,string . value of(taskId));

}

2、消费者变现

消费者代码并不难,就是删除过期的zset中的元素,然后处理数据。

公共void消费者(){

executors . newsinglethreadexecutor()。提交(新的Runnable() {

@覆盖

公共无效运行(){

while (true) {

SetString taskIdSet=redis ops . get jedis()。zrangeByScore(RedisOps.key,0,System.currentTimeMillis(),0,1);

if(taskIdSet==null | | taskIdSet . isempty()){

System.out.println(“无任务”);

}否则{

taskIdSet.forEach(id - {

long result=RedisOps.getJedis()。zrem(RedisOps.key,id);

if(结果==1L) {

System.out.println(从延迟队列中获取任务,taskId: id ,当前时间: local datetime . now());

}

});

}

尝试{

时间单位。毫秒睡眠(100);

} catch (InterruptedException e) {

e . printstacktrace();

}

}

}

});

}

可以看出,这种方法其实比上一种更好。因为他的两个缺点都克服了。多机也行,不用担心部署时间间隔长。

摘要

两种方法都不错,都能解决问题。遇到问题,多思考,多总结。

这就是本文关于实现redis延迟队列的两种方法(总结)。更多相关redis延迟队列内容,请搜索我们之前的文章或者继续浏览下面的相关文章。希望大家以后能多多支持我们!

redis实现延时消息队列,redisson延迟队列