Spring boot Rabbitmq消息防丢失实践

SpringbootRabbitmq消息防丢失实践

本文主要介绍在SpringbootRabbitmq中防止消息丢失的实践。文章围绕主题,详细介绍了内容,有一定的参考价值,有需要的朋友可以参考一下。

目录

前言消息丢失的原因环境准备使用确认机制模拟场景实现RabbitTemplate的效果。ConfirmCallback接口发送方代码。使用返回机制模拟场景,实现兔子模板的效果。返回回拨发送方代码。rabbitmq服务挂起,导致内存中的消息丢失。发送给消费者消费失败。修改application.yml配置文件很消耗,但是忘记做手动确认ack的操作代码了。效果消费过程中,触发未知异常,代码没有try catch效果1和效果2的总结。

前言

之前在网上看了很多关于防损的文章,文章里理论知识太多,就想着自己实践一下,在实践的过程中踩了一些坑,所以写了这篇文章。如果文章有问题,请在评论区指出。

导致消息出现丢失的原因

发送时失败

表示当发送方发送消息准备到达消息队列时,由于网络波动、消息队列服务停机等原因,消息队列服务无法接收消息。从而导致了亏损。

到达时宕机

,消息队列服务收到消息后,如果没有开启持久化,那么消息会被存储在内存中(当然如果内存紧张,也会转移到磁盘上缓解内存)。如果此时服务挂断,内存中的消息将会丢失。

发送到消费端失败

,消费者收到消息,消费者服务挂机,rabbitmq默认自动ack,也就是说rabbitmq发送给消费者。一旦确认消费者已经收到,无论消费成功与否,rabbitmq都会认为是成功发送。

就这三种情况来练习吧。

环境

jdk1.8

弹簧靴2.3.7 .释放

弹簧-启动-启动器-amqp 2.3.7 .版本

兔子q 3.7.7

准备工作

我事先准备了好了交换机以及队列:

开关:message.log.test.exchange和message.log.test2.exchange队列:message.loss.test.queue

Message.loss.test.queue和message.log.test.exchange是绑定关系,而message.log.test2.exchange没有绑定队列。

1.发送时失败

发送失败,rabbitmq有两种发送失败的情况。

消息没有到达rabbitmq的交换机(交换)。消息到达rabbitmq的交换机(exchange ),但未到达队列。

第一种解决方案是使用

confirm机制

。第二种解决方案是使用

return机制

使用confirm机制

模拟场景

确认机制是当来自发送方的消息没有到达rabbitmq的交换时,会触发确认方法,告诉发送方消息没有到达rabbitmq,需要处理。

我们可以在这里从010到59000模拟上面的场景。

实现RabbitTemplate.ConfirmCallback接口

/**

*该方法在消息没有到达Rabbitmq的交换机时触发(当然到达时也会触发,)

*/

@组件

公共类ConfirmCallBack实现RabbitTemplate。确认回调{

@资源

私人兔模板兔模板;

@PostConstruct

public void init(){

rabbit template . setconfirmcallback(this);

}

/**

*

* @param correlationData消息属性正文

* @param ack成功,成功到达为真,未到达为假

* @param cause rabbitmq自带信息

*/

@覆盖

public void confirm(correlation data correlation data,boolean ack,String cause) {

//第一个坑,如果发送方在发送消息时不处理correlationData,那么conirm方法接收到的对象将为null。

//当接收失败,correlationData对象为空时,证明目前无法追溯到该业务,可以进行业务日志处理。

如果(!ackcorrelationData==null){

System.out.println(原因);

//日志处理。

返回;

}

//如果接收失败

如果(!确认){

system . out . println( message Id: correlation data . getid());

message message=correlation data . getreturnedmessage();

System.out.println(消息体:新字符串(消息。getbody()));

//这里可以持久化业务消息体到数据库,然后定时去进行补偿处理或者重试等等

返回;

}

//处理完成

}

}

发送端代码

/**

* 消息的推送

* @返回

*/

@PostMapping(push )

公共布尔push(){

TestMessage TestMessage=new TestMessage();

testMessage.setName(mq名称);

testMessage.setBusinessId(业务id’);

//定义相关数据对象以及消息属性。不然确认方法无论失败还是成功,相关数据参数永远是空

相关数据相关数据=新的相关数据(uuid。随机uuid().toString());

//传递业务数据

相关数据。setreturnedmessage(新消息(JSONObject.toJSON(testMessage)).toString().getBytes(标准字符集.UTF_8),新消息属性()));

//发送消息(这里发送给了message.log.test.exchange11交换机,但实际兔子q并不存在)模板。convertandsend(消息。日志。测试。交换11 ,消息_丢失_测试,测试消息,相关数据);

返回真实的

}

这里是我踩的第一个坑,如果发送端不定义correlationData,那么确认接收到的相关数据对象参数都会是空

实现效果

使用return机制

模拟场景

当消息到达了兔子q的交换机的时候,但是又没有到达队列,那么就会触发返回方法。

下面我们定义一个

发送消息到rabbitmq不存在的交换机上

,就可以模拟上述场景

实现RabbitTemplate.ReturnCallback

/**

* 当消息没有到达兔子q的队列时就会触发该方法

*/

@组件

公共类return回调实现兔子模板.返回回调{

@资源

私人兔模板兔模板;

@PostConstruct

public void init() {

兔子模板。setreturncallback(this);

}

/**

* @param消息消息体

* @param replyCode返回代码

* @param replyText返回文本

* @param exchange交换机

* @param routingKey发送方定义的路由键

*/

@覆盖

public void返回的消息(Message Message,int replyCode,String replyText,String exchange,String routingKey) {

System.out.println(消息标识: message.getMessageProperties().getDeliveryTag());

字符串messageBody=null

尝试{

邮件正文=新字符串(message.getBody(), UTF-8 );

} catch(UnsupportedEncodingException e){

e。printstacktrace();

}

System.out.println(消息:消息体);

系统。出去。println(回复码);

系统。出去。println(回复文本);

系统。出去。println(交易所);

系统。出去。println(路由关键字);

}

}

发送端代码

/**

* 消息的推送

* @返回

*/

@PostMapping(push2 )

公共布尔push2(){

TestMessage TestMessage=new TestMessage();

testMessage.setName(mq名称2);

testMessage.setBusinessId(业务id’);

模板。convertandsend(消息。日志。测试2。exchange , message_loss_test ,JSONObject.toJSON(testMessage).toString());

返回真实的

}

这里需注意消息体需要JSON序列化,不然返回的消息方法接收的消息身体会是乱码

实现效果rabbitmq服务挂了,造成内存的消息丢失。

这个开启兔子q的持久化机制就好了,开启之后消息到达兔子q服务,会实时转入磁盘。这里怎么设置就不多说了,网上挺多文章可以解答。

不过即使开启了还是会有一种情况会造成消息丢失,那就是消息即将要持久化到磁盘的那一刻,服务挂了,就会造成丢失,不过这种情况我也不知道怎么模拟,所以就暂不实践了。

发送到消费端消费失败

上面提到默认情况下兔子q使用的是自动确认字符(确认字符)的方式,我们将它改成手动确认字符(确认字符)的方式,就可以解决这个问题。

修改application.yml配置文件

兔子问:

听众:

简单:

#开启手动确认

确认模式:手动

#开启失败后的重试机制

重试:

启用:真

#最多重试3次

最大尝试次数:3

下面我们试一下几种消费端消费不成功的场景

消费了,但是忘记做手动确认ack的操作代码。

@组件

公共类TestConsumer {

/**

*消费

* @param testmessage消息正文

* @param message消息属性

* @param channel mq通道对象

*/

@ rabbit listener(queues={ message . loss . test . queue })

公共void测试(测试消息测试消息,消息消息,通道通道)引发IOException {

system . out . println( Consume testmessage: testmessage . getname());

//channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);

}

}

效果

没有绑定队列的交换机,然后发送消息到交换机

在用Postman第一次请求后,控制台显示消息被消费的信号。然后查看rabbitmq后台管理刚刚消费的消息,在改成

效果流程:

停止程序(关闭消费者)后,过了一会儿后台管理显示消息又改回

Unacked

,表示已经回到队列。重启程序(打开消费段),消息被重新消费。

总而言之,如果消费者不做人工确认,在消费者被关闭之前,消息就会变成

Ready

,不会再被消费。但是一旦消费者被关闭,消息将返回队列并被消费者消费。

消费过程中,触发了未知异常,代码没有try catch

/**

*消费

* @param testmessage消息正文

* @param message消息属性

* @param channel mq通道对象

*/

@ rabbit listener(queues={ message . loss . test . queue })

公共void测试(测试消息测试消息,消息消息,通道通道)引发IOException {

system . out . println( Consume testmessage: testmessage . getname());

//故意触发异常

如果(!string utils . isempty(testmessage . getname()){

抛出新的runtime exception(“11211”);

}

channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);

}

效果1

上面的渲染图显示,在我触发异常后,消息重试了三次,也就是我在application.yml中配置的三次重试

如果我删除重试机制会发生什么?

效果2

效果和忘记做ack操作是一样的。报文没有ack后,报文会变成

Unacked

。消费者关闭后,消息将返回队列,然后当它被重新链接时,它将被再次消费。

总结

这就是这篇关于防止Spring boot Rabbitmq消息丢失的实践的文章。更多关于Spring boot Rabbitmq的信息,请搜索我们之前的文章或者继续浏览下面的相关文章。希望你以后能支持我们!

Spring boot Rabbitmq消息防丢失实践