前言
MQ的主要特点为解耦、异步、削峰,该文章主要记录与分享个人在实际项目中的RocketMQ削峰用法,用于减少数据库压力的业务场景,其中RocketMQ的核心组件概念如下:
Producer:生产发送消息Broker:存储Producer发送过来的消息Consumer:从Broker拉取消息并进行消费NameServer:为Producer或Consumer路由到Broker其中消费流程有以下几点是必须注意的:
RocketMQ的Consumer获取消息是通过向Broker发送拉取请求获取的,而不是由Broker发送Consumer接收的方式。Consumer每次拉取消息时消息都会被均匀分发到消息队列再进行传输,所以RocketMQ中的很多参数都是针对队列而不是Topic的(这个是重点,顺便吐槽下源码的文档讲的真不清晰,很多都需要自己试错,但Dashboard做得很好),其中每个Broker消息队列(ConsumeQueue)的数量都可以通过RocketMQDashBoard实时更改调整。rocketmq-spring-boot-starter用法简介
当开发中需要快速集成RocketMQ时可以考虑使用rocketmq-spring-boot-starter搭建RocketMQ的集成环境,但该框架并不完全具备RocketMQ所有的配置简化,如需批量消费消息便需要自定义一个DefaultMQPushConsumerbean去消费了。个人在开发中常用的rocketmq-spring-boot-starter相关类:
RocketMQListener接口:消费者都需实现该接口的消费方法onMessage(msg)。RocketMQPushConsumerLifecycleListener接口:当
RocketMQMessageListener中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer配置RocketMQMessageListener:被该注解标注并实现了接口RocketMQListener的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。RocketMQMessageListener中的属性配置是可以使用Placeholder(占位符)从配置文件或配置中心获取的,如下图:业务案例有一个点赞业务,不限制用户的点赞数只需进行记录(产品需求,开发提议无效),当每个用户都进行x连击享受数量猛增的快感时如果数据库都需要进行x个点赞数据的插入,数据库毫无疑问会塞死导致崩溃。于是想到可以尝试下MQ削峰,比如每秒来了消息但数据库只能承受,那我消费时每次只拉取消费就好了,剩下的放在Broker堆积慢慢消费就好。由于之前的消息中心也在用RocketMQ,于是确认使用RocketMQ来进行削峰。
环境配置
文章例子环境:1NameServer+2Broker+1Consumer
添加maven依赖
dependency
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-spring-boot-starter/artifactId
/dependency
application.yml配置
rocketmq:
name-server:.0.0.1:
producer:
group:praise-group
server:
port:
spring:
datasource:
driver-class-name: