你使用过的MQ技术有哪些
RabbitMQ、RocketMQ、Kafka
如何保证消息不丢失
如果在生产者中丢失,可以使用事物机制,但是他是同步的很消耗性能,也可以开启确认机制,生产者在将消息发送到MQ后,等待MQ的确认响应。只有在收到MQ的确认后,生产者才认为消息已安全发送。如果没有收到确认,生产者可以重试发送消息。
如果在MQ中丢失,可以开启持久化将消息存储到硬盘,可以保证持久性。
如果在消费者中丢失,消费者在成功处理消息后发送ACK给MQ,表示消息已被处理。如果没有收到ACK,MQ可以将消息重新投递给其他消费者。
RocketMQ如何保证消息的可用性 / 可靠性 / 不丢失呢?
在生产者是通过请求确认机制,同步发送的时候没有发生议程就会响应OK,发生异常就会响应异常,异步发送的时候,要在回调方法中检查,如果发送失败应该去重试
在存储是配置可靠优先的Broker参数来避免宕机消息丢失,消息会持久化到CommitLog日志中,即使宕机也会重新消费,他Broker的刷盘机制不管是同步或者异步都可以保证消息一定存储在内存中,它是生产者发送消息后等数据持久化到磁盘之后再返回响应给生产者。
在消费者是消费时的确认机制,要等都执行完了后再发送消费确认。
RocketMQ如何处理消息重复的问题呢?(如何保证消息幂等性)
可以在业务消费逻辑上保证幂等性,就是多次调用和一次调用都是一样的
也可以消息去重,就是保证每条消息都有一个唯一id,可以简历一个消费记录表,拿到消息后会去数据插入这个唯一id,如果出现重复消费就会出现主键冲突,那就不再处理这条消息就可以了。
如何处理消息积压
可以扩充消费者,来提高消费能力
也可以把消息迁移到新的Topic,临时的Topic可以扩容消费者这样就可以处理消息积压
RocketMQ中如何实现顺序消息
他分为部分顺序和全局顺序
全局顺序要把Topic 的读写队列数设置为 一,然后 Producer Consumer 的并发设置,也要是一,这样牺牲了高并发变成单线程他就可以实现顺序消息了。
部分顺序的话要在Producer把同 ID 的消息发送到同一个 Message Queue里面,消费的时候要从同一个Message Queue里面顺序处理
RocketMQ中如何实现消息过滤
在 Broker 端按照 Consumer 的去重逻辑进行过滤,可以避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
在 Consumer 端过滤,比如按照消息设置的 tag 去重,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。
RocketMQ中如何实现延迟消息
在发送消息时,可以设置消息的延迟级别。RocketMQ 提供了几种预定义的延迟级别,分别表示不同的延迟时间。
实际上他就是一个临时存储+定时轮询的方式实现了延迟消息,当Topic收到延迟消息后他不会直接把消息放到队列中去,会创建一个定时的队列,通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,从而实现了延迟消息的效果
RocketMQ中的死信
死信队列就是用于处理无法被正常消费的消息的,当消息消费失败超过最大重试次数的时候就会把消息放入死信队列,我们应该监控死信队列去把无法消费的消息手动消费
RocketMQ 中 Broker 是怎么保存数据的呢?
主要的存储文件包括 CommitLog 文件、ConsumeQueue 文件、Indexfile 文件。
CommitLog他默认大小1G,消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
ConsumeQueue是基于 Topic 的 CommitLog的索引文件
IndexFile提供了一种可以通过 key 或时间区间来查询消息的方法。
RocketMQ是怎么会消息文件进行读写的
PageCache
、顺序读写
、零拷贝
- PageCache: 利用操作系统的PageCache来缓存文件内容,提高写入和读取的效率,避免频繁的磁盘IO操作。
- 顺序读写: 消息的写入和读取都是基于顺序的方式进行,减少了磁盘寻道操作,提高了读写效率。
- 零拷贝技术: 在数据传输过程中避免数据从一个缓冲区拷贝到另一个缓冲区,减少了数据拷贝的开销,提高了数据传输效率。