本文主要介绍在SpringbootRabbitmq中防止消息丢失的实践。文章围绕主题,详细介绍了内容,有一定的参考价值,有需要的朋友可以参考一下。
目录
前言消息丢失的原因环境准备使用确认机制模拟场景实现RabbitTemplate的效果。ConfirmCallback接口发送方代码。使用返回机制模拟场景,实现兔子模板的效果。返回回拨发送方代码。rabbitmq服务挂起,导致内存中的消息丢失。发送给消费者消费失败。修改application.yml配置文件很消耗,但是忘记做手动确认ack的操作代码了。效果消费过程中,触发未知异常,代码没有try catch效果1和效果2的总结。
前言
之前在网上看了很多关于防损的文章,文章里理论知识太多,就想着自己实践一下,在实践的过程中踩了一些坑,所以写了这篇文章。如果文章有问题,请在评论区指出。
导致消息出现丢失的原因
发送时失败
表示当发送方发送消息准备到达消息队列时,由于网络波动、消息队列服务停机等原因,消息队列服务无法接收消息。从而导致了亏损。到达时宕机
,消息队列服务收到消息后,如果没有开启持久化,那么消息会被存储在内存中(当然如果内存紧张,也会转移到磁盘上缓解内存)。如果此时服务挂断,内存中的消息将会丢失。发送到消费端失败
,消费者收到消息,消费者服务挂机,rabbitmq默认自动ack,也就是说rabbitmq发送给消费者。一旦确认消费者已经收到,无论消费成功与否,rabbitmq都会认为是成功发送。就这三种情况来练习吧。
环境jdk1.8
弹簧靴2.3.7 .释放
弹簧-启动-启动器-amqp 2.3.7 .版本
兔子q 3.7.7
准备工作我事先准备了好了交换机以及队列:
开关:message.log.test.exchange和message.log.test2.exchange队列:message.loss.test.queue
Message.loss.test.queue和message.log.test.exchange是绑定关系,而message.log.test2.exchange没有绑定队列。
1.发送时失败
发送失败,rabbitmq有两种发送失败的情况。
消息没有到达rabbitmq的交换机(交换)。消息到达rabbitmq的交换机(exchange ),但未到达队列。
第一种解决方案是使用
confirm机制
。第二种解决方案是使用return机制
。使用confirm机制
模拟场景确认机制是当来自发送方的消息没有到达rabbitmq的交换时,会触发确认方法,告诉发送方消息没有到达rabbitmq,需要处理。
我们可以在这里从010到59000模拟上面的场景。
实现RabbitTemplate.ConfirmCallback接口/**
*该方法在消息没有到达Rabbitmq的交换机时触发(当然到达时也会触发,)
*/
@组件
公共类ConfirmCallBack实现RabbitTemplate。确认回调{
@资源
私人兔模板兔模板;
@PostConstruct
public void init(){
rabbit template . setconfirmcallback(this);
}
/**
*
* @param correlationData消息属性正文
* @param ack成功,成功到达为真,未到达为假
* @param cause rabbitmq自带信息
*/
@覆盖
public void confirm(correlation data correlation data,boolean ack,String cause) {
//第一个坑,如果发送方在发送消息时不处理correlationData,那么conirm方法接收到的对象将为null。
//当接收失败,correlationData对象为空时,证明目前无法追溯到该业务,可以进行业务日志处理。
如果(!ackcorrelationData==null){
System.out.println(原因);
//日志处理。
返回;
}
//如果接收失败
如果(!确认){
system . out . println( message Id: correlation data . getid());
message message=correlation data . getreturnedmessage();
System.out.println(消息体:新字符串(消息。getbody()));
//这里可以持久化业务消息体到数据库,然后定时去进行补偿处理或者重试等等
返回;
}
//处理完成
}
}
发送端代码/**
* 消息的推送
* @返回
*/
@PostMapping(push )
公共布尔push(){
TestMessage TestMessage=new TestMessage();
testMessage.setName(mq名称);
testMessage.setBusinessId(业务id’);
//定义相关数据对象以及消息属性。不然确认方法无论失败还是成功,相关数据参数永远是空
相关数据相关数据=新的相关数据(uuid。随机uuid().toString());
//传递业务数据
相关数据。setreturnedmessage(新消息(JSONObject.toJSON(testMessage)).toString().getBytes(标准字符集.UTF_8),新消息属性()));
//发送消息(这里发送给了message.log.test.exchange11交换机,但实际兔子q并不存在)模板。convertandsend(消息。日志。测试。交换11 ,消息_丢失_测试,测试消息,相关数据);
返回真实的
}
这里是我踩的第一个坑,如果发送端不定义correlationData,那么确认接收到的相关数据对象参数都会是空
实现效果使用return机制
模拟场景当消息到达了兔子q的交换机的时候,但是又没有到达队列,那么就会触发返回方法。
下面我们定义一个
发送消息到rabbitmq不存在的交换机上
,就可以模拟上述场景实现RabbitTemplate.ReturnCallback/**
* 当消息没有到达兔子q的队列时就会触发该方法
*/
@组件
公共类return回调实现兔子模板.返回回调{
@资源
私人兔模板兔模板;
@PostConstruct
public void init() {
兔子模板。setreturncallback(this);
}
/**
* @param消息消息体
* @param replyCode返回代码
* @param replyText返回文本
* @param exchange交换机
* @param routingKey发送方定义的路由键
*/
@覆盖
public void返回的消息(Message Message,int replyCode,String replyText,String exchange,String routingKey) {
System.out.println(消息标识: message.getMessageProperties().getDeliveryTag());
字符串messageBody=null
尝试{
邮件正文=新字符串(message.getBody(), UTF-8 );
} catch(UnsupportedEncodingException e){
e。printstacktrace();
}
System.out.println(消息:消息体);
系统。出去。println(回复码);
系统。出去。println(回复文本);
系统。出去。println(交易所);
系统。出去。println(路由关键字);
}
}
发送端代码/**
* 消息的推送
* @返回
*/
@PostMapping(push2 )
公共布尔push2(){
TestMessage TestMessage=new TestMessage();
testMessage.setName(mq名称2);
testMessage.setBusinessId(业务id’);
模板。convertandsend(消息。日志。测试2。exchange , message_loss_test ,JSONObject.toJSON(testMessage).toString());
返回真实的
}
这里需注意消息体需要JSON序列化,不然返回的消息方法接收的消息身体会是乱码
实现效果rabbitmq服务挂了,造成内存的消息丢失。这个开启兔子q的持久化机制就好了,开启之后消息到达兔子q服务,会实时转入磁盘。这里怎么设置就不多说了,网上挺多文章可以解答。
不过即使开启了还是会有一种情况会造成消息丢失,那就是消息即将要持久化到磁盘的那一刻,服务挂了,就会造成丢失,不过这种情况我也不知道怎么模拟,所以就暂不实践了。
发送到消费端消费失败上面提到默认情况下兔子q使用的是自动确认字符(确认字符)的方式,我们将它改成手动确认字符(确认字符)的方式,就可以解决这个问题。
修改application.yml配置文件兔子问:
听众:
简单:
#开启手动确认
确认模式:手动
#开启失败后的重试机制
重试:
启用:真
#最多重试3次
最大尝试次数:3
下面我们试一下几种消费端消费不成功的场景
消费了,但是忘记做手动确认ack的操作代码。@组件
公共类TestConsumer {
/**
*消费
* @param testmessage消息正文
* @param message消息属性
* @param channel mq通道对象
*/
@ rabbit listener(queues={ message . loss . test . queue })
公共void测试(测试消息测试消息,消息消息,通道通道)引发IOException {
system . out . println( Consume testmessage: testmessage . getname());
//channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);
}
}
效果
没有绑定队列的交换机,然后发送消息到交换机
在用Postman第一次请求后,控制台显示消息被消费的信号。然后查看rabbitmq后台管理刚刚消费的消息,在改成
效果流程:
停止程序(关闭消费者)后,过了一会儿后台管理显示消息又改回Unacked
,表示已经回到队列。重启程序(打开消费段),消息被重新消费。总而言之,如果消费者不做人工确认,在消费者被关闭之前,消息就会变成
Ready
,不会再被消费。但是一旦消费者被关闭,消息将返回队列并被消费者消费。消费过程中,触发了未知异常,代码没有try catch/**
*消费
* @param testmessage消息正文
* @param message消息属性
* @param channel mq通道对象
*/
@ rabbit listener(queues={ message . loss . test . queue })
公共void测试(测试消息测试消息,消息消息,通道通道)引发IOException {
system . out . println( Consume testmessage: testmessage . getname());
//故意触发异常
如果(!string utils . isempty(testmessage . getname()){
抛出新的runtime exception(“11211”);
}
channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);
}
效果1上面的渲染图显示,在我触发异常后,消息重试了三次,也就是我在application.yml中配置的三次重试
如果我删除重试机制会发生什么?
效果2效果和忘记做ack操作是一样的。报文没有ack后,报文会变成
Unacked
。消费者关闭后,消息将返回队列,然后当它被重新链接时,它将被再次消费。总结
这就是这篇关于防止Spring boot Rabbitmq消息丢失的实践的文章。更多关于Spring boot Rabbitmq的信息,请搜索我们之前的文章或者继续浏览下面的相关文章。希望你以后能支持我们!