RabbitMq的一些使用
介绍
消息队列各种各样,从基础的Redis、到RabbitMq,到阿里的RocketMq,Go原生的Nsq以及Kafka等。 消息队列的主要用途是跨线程/进程处理业务。
举例
比如,有两个服务,订单服务,积分服务。 由于积分的强一致性并不是特别高,可以在用户下完单后,发送一个消息到消息队列中。而在积分服务中,可以消费这 条下单消息,从而执行增加用户积分逻辑。整个过程是异步且解耦的。后续如果还有新扩展的业务如订单统计服务等, 也可以消费此条消息,从而统计订单数据。
说明
由于我们的系统已经选用了RabbitMq,所以做下常用的了解有助于日常工作。
1. RabbitMq的组件
- Exchange 交换机
- Queue 队列
- Binding 绑定键
消息生产者发送消息时需要指定交换机以及路由键,而交换机会根据交换机的类型确定下发到的队列上。消费者指定队列进行消费消息。
1.1 常用Exchange类型
Direct
直连类型,根据指定的绑定键发送到指定的队列。在mq后台,需要配置绑定键以及队列,交换机。
Fanout
广播模式下,不会使用到绑定键,而是使用匿名队 列,所有绑定到该交换机下的队列都会收到消息。
Topic
主题模式下,会根据具体的路由键配置,发送到不同 的队列中,可以根据通配符进行配置。
1.2 配置队列
队列的配置需要在后台进行操作,默认用户guest/guest, 在部署的服务器15672接口下可以访问该后台。 假设已经配置好vHost和用户的前提下,需要继续进行以下的 配置。
Exchange
新建exchange,指定名称和类型。
Queue
新建队列,指定名称。
Binding
回到Exchange,点进刚创建的Exchange,添加Bind,To queue为刚新建的queue,routingkey自己指定。
1.3 伪编码
生产者(订单服务)
public void addOrder(Order order) { orderDao.insert(order); rabbitMqTemplate.convertAndSend( // 交换机 Exchange "sinfor.directExchange", // 路由键 Binding "sinfor.order.addOrder", // 封装订单消息数据 new MsgData(order)) }
消费者(积分服务)
@RabbitListener(queues = "sinfor.order.queue") public void handlerAddOrder(String message) { log.info("接受到下单数据{}", message); MsgData msg = JSONUtil.toBean(message, MsgData.class); String messageId = msg.getMessageId; // 根据msgId做幂等 // 积分落库 Order order = (Order) msg.getData(); insertCredit(order); }
2. 交换机的选择
2.1 上述问题
假如我还有订单统计服务也想接收下单消息,那么上面的交换机配置就不能满足了。因为RabbitMq队列默认是轮询模式,假如订单统计服务也是消费sinfor.order.queue这个队列,那么积分&统计服务中只能有一个服务能正常的接受到数据了。
2.2 广播模式
假如我选择了fanout模式的交换机,是否可以解决这个问题。答案是可以,但是会带来另外的问题。 假设现在有两个队列,一个给积分服务使用,另一个给统计服务使用。可以解决掉上面使用同一个队列带来的问题。但是,后续假如用户取消了订单,订单服务中又新增了取消订单消息。 现在就有两种消息了,下单&取消订单消息,而这两种消息都走的同一个队列,那就只能在封装的MsgData中设置字段来区分开具体的消息。而且还会带来如果某一个消息数据非常大/出问题了,阻塞了这个队列上所有的消息。
2.3 Topic模式
topic类型的exchange允许用户根据具体的路由键通配符进行匹配,会将消息发送到匹配上的队列上。比如,有消息sinfor.order.add(路由键)消息和sinfor.order.del消息,对积分服务来说,我想监听两个消息,而对统计服务来说,我暂时只统计每天的下单数量,不考虑用户取消掉的数量(模拟场景)。那么我只需要监听sinfor.order.add这个消息即可。此时,我可以创建两个队列,一个给积分服务,另一个给统计服务。而积分服务需要匹配两个消息,统计服务只需要匹配新增订单的消息。
2.3.1 队列配置如下
- queue.sinfor.credit 队列名称,积分队列
- sinfor.order.# 队列绑定的路由键 #代表匹配所有
- queue.sinfor.statistics
- sinfor.order.add
同时,如果queue.sinfor.credit压力过大,消费起来速度不够及时的话,也可以将其做进一步拆分,如下
- queue.sinfor.credit.orderAdd
- sinfor.order.add
- queue.sinfor.credit.orderDel
- sinfor.order.del
这种情况下,对积分服务而言,新增订单走新增订单的队列,删除订单走删除订单的队列,消息积压的情况会进一步减小。
2.3.2 伪代码
配置类
// 使用topic类型交换机
@Bean("sinforTopicExchange")
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
}
// 使用匿名队列
@Bean("topicQueue")
public Queue topicQueue() {
return new AnonymousQueue();
}
// 所有匹配路由的消息都发送到此交换机上
@Bean("topicBinding")
public Binding topicBinding(@Qualifier("sinforTopicExchange") Exchange exchange, @Qualifier("topicQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("sinfor.order.#").noargs();
}
生产者 订单服务
@PostMapping("addOrder")
public void addOrder() {
// 订单入库
Order order = new Order();
order.setOrderNo(OrderNoGenUtil.gen());
order.setData(...);
orderDao.insert(order);
// 订单消息
MsgData msg = MsgData.getInstance.setData(order);
rabbitTemplate.convertAndSend("exchange.sinfor.topic", "sinfor.order.add", msg);
}
@PostMapping("delOrder/{orderId}")
public Boolean delOrder(@PathVariable Long orderId) {
// 删除订单
orderDao.delByOrderId(orderId);
MsgData msg = MsgData.getInstance.setData(orderId);
// 删除订单消息
rabbitTemplate.convertAndSend("exchange.sinfor.topic", "sinfor.order.del", msg);
return true;
}
积分服务
// 此队列路由键为sinfor.order.add
@RabbitListener(queues = "queue.sinfor.credit.orderAdd")
public void addOrderListener(String message) {
log.info("-------积分服务收到新增订单消息------:{}", message);
MsgData msgData = JSONUtil.toBean(message, MsgData.class);
// 过滤重复消息
Boolean flag = redisTemplate.opsForValue().setIfAbsent(msgData.getMsgId(), "1", 10, TimeUnit.SECONDS);
if (flag != null && flag) {
Order order = (Order) msgData.getData();
// 积分落库
...
} else {
log.warn("重复的消息,丢弃掉这条数据");
}
}
// 此队列路由键为sinfor.order.del
@RabbitListener(queues = "queue.sinfor.credit.orderDel")
public void addOrderListener(String message) {
log.info("-------积分服务收到删除订单消息------:{}", message);
MsgData msgData = JSONUtil.toBean(message, MsgData.class);
// 过滤重复消息(可以抽取为组件/切面)
Boolean flag = redisTemplate.opsForValue().setIfAbsent(msgData.getMsgId(), "1", 10, TimeUnit.SECONDS);
if (flag != null && flag) {
Long orderId = (Long) msgData.getData();
// 积分删减
...
} else {
log.warn("重复的消息,丢弃掉这条数据");
}
}
统计服务
// 此队列路由键为sinfor.order.add
@RabbitListener(queues = "queue.sinfor.statistics")
public void addOrderListener(String message) {
log.info("-------统计服务收到新增订单消息------:{}", message);
MsgData msgData = JSONUtil.toBean(message, MsgData.class);
// 过滤重复消息
Boolean flag = redisTemplate.opsForValue().setIfAbsent(msgData.getMsgId(), "1", 10, TimeUnit.SECONDS);
if (flag != null && flag) {
Order order = (Order) msgData.getData();
// 统计落库
...
} else {
log.warn("重复的消息,丢弃掉这条数据");
}
}
此后比如业务扩展,新增的某个服务想监听我们的订单数据变化,只需要新增相对队列(queue.sinfor.xxx),并做好路由键的映射(sinfor.order.add)后由对应服务编写代码即可,订单服务(生产者)无需再做变更。