mafka
Mafka是美团自研的一个高可用、可拓展、高性能的分布式消息队列服务,底层基于Apache Kafka,广泛用于消息异步处理、应用解耦、流量削峰、发布/订阅模型等场景。Mafka为研发工程师提供全托管的消息队列服务,用户只需专注于业务开发无需部署运维。
用户场景
异步处理模式
- 消息发送者 可以发送一个消息而无须等待消费者响应。消息发送者将消息发送到一条 虚拟的通道(主题 或 队列)上,消息接收者则订阅或是监听该通道。一条信息可能最终转发给 一个或多个 消息接收者,这些接收者都无需对消息发送者做出同步回应,整个过程都是异步的。
应用系统之间的解耦合
- 发送者和接受者不必了解对方、只需要确认消息,比如发送和接收者可以是不同的系统,不同的语言编写的,地理上可以不在同一个地域
- 发送者和接受者不必同时在线
流量削峰
- 当在线api接口在应对高峰流量时,比如“秒杀”,“流量激增”时,如果接口处理能力有限,可以先将无法及时处理的请求发送给消息队列,后台处理,防止流量过大将api接口服务打死。
- “秒杀”场景需要下游消费方的消费能力达到最大,业务方可以使用push类型消费组进行消费
发布/订阅模型(Pub/Sub)
- 一条消息,可以广播给任意多个收听方
将Mafka作为临时存储
特性 | 描述 |
---|---|
消息回放 | Mafka 支持最长 7 天的消息回溯,也就是说,可以实现多次重复消费最长 7 天以内的消息(默认 7 天,根据业务需求可以调节) |
中心化调度 | 基于机房粒度的生产者和消费者调度,让业务达到同机房优先生产或消费,同地域优先生产或消费。 |
延时消息 | Mafka 支持最短 5s,最长 30 天的延迟时间范围,生产者将延时消息先发送给 Mafka,当消息到期后,Mafka 再将消息发送给消费者。 |
Push消费 | 消费者通过 PushServer 来消费消息,对 Mafka 的 Partition 透明,消费者数量不再受制于 Partition 数量。 |
消息回放原理介绍
broker中的消息在磁盘中是一个appendLog文件,顺序追加到日志文件尾部,每条消息都有一个唯一标识——offset。消费者通过offset位置决定消费消息的位置。mafka提供通过重新设置offset来重新消费消息或者跳过某些消息的功能。
- 数据修复和恢复:当生产环境出现数据错误或数据丢失时,可以使用mafka消息回放工具,将历史数据重新发送到生产环境中,以修复和恢复数据。
- 测试和调试:在开发和测试过程中,可以使用mafka消息回放工具,将历史数据重新发送到测试环境中,以测试和调试应用程序的正确性和性能。
- 数据分析和统计:在数据分析和统计过程中,可以使用mafka消息回放工具,将历史数据重新发送到数据分析和统计系统中,以进行数据分析和统计。
- 数据备份和迁移:在数据备份和迁移过程中,可以使用mafka消息回放工具,将历史数据重新发送到备份和迁移系统中,以保证数据的完整性和一致性。
Push消费原理介绍
(1)由PushServer内部的消费者客户端(Mafka客户端)从Broker集群拉取消息,并投递到内存队列中(内存队列所有分区共享,队列大小为100)。
(2)PushTask任务从内存队列中获取消息,并逐一推送给链接到当前节点的所有客户端。
(3)TimeOutCheckTask负责检查所有已经推送给客户端的消息是否响应超时(默认响应时间为100s),如果客户端响应超时,则将超时消息投递到超时队列中。
(4)PushTask获取新的消息进行推送时,优先从超时队列中拉取消息进行投递。
延时消息原理介绍
消费组粒度延迟
说明:在消费组配置项中修改,是消费组粒度的延迟,每个消费组单独做设置即可(这里需要注意,具体泳道,SET和主干环境,需要具体到各个环境的消费组下修改配置才能生效,不能只修改主干消费组配置),使用这个消费组的所有消息都延迟的时间相同,该延迟时间没有限制,可以自定义设置,以秒为单位。
消费组延迟生效在消费端,大概原理可以理解为客户端收到消息之后延迟一定时间之后再调用业务消费逻辑进行消费。
代码不需要做任何变更,只需要在消费组配置页面修改如下标红参数即可,业务侧在有消费组写权限的情况下可以自己修改该配置,申请人都有该项修改权限,修改完成后,接入就会生效。
- 代码接入参考普通的mafka生产端和消费端接入即可,可以参考管理平台的Demo代码示例。
- 该延时方式适合短时间的延时,服务端保存消息单分区最多10G,如果消息流量大,延时时间长可能导致尚未被消费的消息被服务端清理掉造成消息丢失。
消息粒度延迟
延时消息接入分两步:
- MQ平台主题配置页面修改delay.enable=true
- 代码中创建producer使用 MafkaClient.buildDelayProduceFactory(或者通过bean方式创建两种方式都要!!!!!必须要改代码)
- 发送消息使用带delay的发送方法,比如sendDelayMessage、sendAsyncDelayMessage
消息粒度延迟的原理大概是客户端在发送消息之后,不会马上发送到broker,而是先把消息暂存到cellar中,等延时时间到达才正式发送消息到broker。每条消息的延迟时间可以不同,通过代码控制。
消息积压
所有积压问题都可以理解成是消费能力不足的问题,导致消费能力不足原因有很多,
1.生产量突增
联系管理员扩容partition
2.消费组停用
可以在mafka的管理平台的消费组页面,查看有没有启动消费者或者启动消费者失败,请查看日志中是否有异常:
3.消费慢了,增加消费能力
优化消费逻辑——降低消费逻辑处理耗时(最主要的方式!!!)有的业务场景消费逻辑比较耗时,消费耗时就导致消费消息的qps不高,消费能力不高。这就需要业务方优化消费逻辑代码,降低处理耗时,从而提升处理能力。
增加单机消费线程数(推荐!!!)——提升单机消费能力,增加单partiton并行消费线程数目(这种方式不保证消费顺序!!!)
调整分配模式——让消费者负载更均衡(推荐!!!)查看消费组配置修改页面,如果partition.assign.mode=默认分配模式,并且有较多的消费者没有抢占到partition,可以将该模式设置为消费者均衡模式,该模式能够最大化利用消费者来提升消费能力。
调整消费策略——减少跨地域调用的网络时延在消费组页面,查看消费状态信息,是否出现跨地域消费情况,即是否出现上海机器在消费北京集群的情况(或者北京机器在消费上海集群的情况),跨地域消费会增加网络耗时,导致消费时延较高,消费速率慢,建议调整成同机房优先或者同地域优先。
单机消费能力达到瓶颈(可以通过机器的CPU、内存、网卡的变化来判断)后,可以增加消费者机器数目;优先提高单机的处理能力
丢消息问题
主题ack配置确认,首先确定主题的ack配置,如果request.required.acks 不是 默认值 -1,消息可能在发送过程中丢失。 request.required.acks=-1,消息会被同步到多个副本成功后再给生产者返回成功,只要多个副本不同时故障,就不会有消息丢失。
通过消息轨迹进行消息的查询,按照对应的时间可以查询到对应的消息。如果此处有,那么消息就已经保存到了broker中,不存在丢消息的情况,请仔细检查消费者端的逻辑。
如果消息丢失,那么建议看一下代码中,是否在别的应用中用同一个消费组内启用了多个消费者实例,导致消息被另外一个消费组消费。
业务发现丢消息并且可能存在当前topic consumer无法消费到消息的情况。这个可能是开启了环境隔离,导致消息被生产到环境隔离的topic中,所以导致当前的topic没有消息(疑似丢消息)。可以上管理平台看一下是否开启了环境隔离,并且可以检查一下对应的消费日志。
积压过多导致被系统删除,Mafka系统中保存的消息是有限的,一旦超过限制就会把最老的消息清理掉,维持保存的消息总量不超限。一旦消费积压过多,可能出现还未消费的消息被服务端清理的情况,此时消费者一般会选择从该分区的最新位置开始消费消息,确定是否是该场景:MQ管理平台的消息积压分布有一个突降
消息重复消费
1.业务方重复生产
由于业务方的原因,可能对同一份消息内容发送了多次,造成该消息在Mafka系统内重复。在该场景下,重复消息的msgId、offset(cluster、partition)信息是不同的。
2.Mafka重复生产
mafka客户端在发送消息时,会有内部重试,默认一条消息发送失败会自动重试3次,如果所有重试都失败,会给用户返回【发送失败】的结果,但是该消息仍然可能存在于mafka系统中。在该场景下,重复消息的生产方都是同一台机器,并且发送间隔时间较短(30s内),重复消息的msgId相同、offset(cluster、partition)信息是不同的。
3.消费失败重复消费
消费状态码在RECONSUME_LATER(消费重试)、CONSUME_FAILURE(消费失败)两种情况下,会在客户端进程内进行多次消费重试。再该场景下,重复消息的msgId、offset(cluster、partition)信息是相同的,消费的机器是相同的机器
4.分区变动重复消费
在客户端实例变化、分区扩容、集群切换等场景会涉及到分区重分配,由于新老客户端感知分配结果不及时,可能出现两台机器短时间消费同一个分区的场景,造成重复消费。再该场景下,重复消息的msgId、offset(cluster、partition)信息是相同的,消费的机器是两台不同的机器。
顺序消费
- 发送消息可以通过携带key决定partition(相同的可以落到同一个partition上),
- 不能开并发消费:不能通过调整单partition并行消费线程个数(consumer.parallel.num)来增加消费能力,多线程并发无法保证顺序
- 不能使用push消费:Push消费类型无法保证消费顺序