args = new HashMap<>(3);
args.put("x-message-ttl",ttl);
args.put("x-dead-letter-routing-key",couponReleaseRoutingKey);
args.put("x-dead-letter-exchange",eventExchange);
return new Queue(couponReleaseDelayQueue,true,false,false,args);
}
/**
* 死信队列,普通队列,用于被监听
*/
@Bean
public Queue couponReleaseQueue(){
return new Queue(couponReleaseQueue,true,false,false);
}
/**
* 第一个队列,即延迟队列的绑定关系建立
*/
@Bean
public Binding couponReleaseDelayBinding(){
return new Binding(couponReleaseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseDelayRoutingKey,null);
}
/**
* 死信队列绑定关系建立
*/
@Bean
public Binding couponReleaseBinding(){
return new Binding(couponReleaseQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseRoutingKey,null);
}
}
```
### 延迟消息功能测试开发和注意事项
* 测试代码编写测试
```java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CouponApplication.class)
@Slf4j
public class DemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send(){ rabbitTemplate.convertAndSend("coupon.event.exchange","coupon.release.delay.routing.key","5qeqweqw");
}
}
```
注意
* 有些版本会自动创建队列和交换机@bean,有些是要初次请求才会
* 如果MQ已经存在对应的队列,则不会重新创建
* 修改配置后,需要删除队列重新建立生效
* 如果队列和交换机已经存在,重新启动项目会有错误警告,可以忽略
### 优惠券锁定-发送延迟消息功能开发
消息协议介绍
```java
@Data
public class CouponRecordMessage {
/**
* 消息队列id
*/
private Long messageId;
/**
* 订单号
*/
private String outTradeNo;
/**
* 库存锁定工作单id
*/
private Long taskId;
}
```
发送功能开发
```java
for(CouponTaskDO couponTaskDO : couponTaskDOList){
CouponRecordMessage couponRecordMessage = new CouponRecordMessage();
couponRecordMessage.setOutTradeNo(orderOutTradeNo);
couponRecordMessage.setTaskId(couponTaskDO.getId()); rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getCouponReleaseDelayRoutingKey(),couponRecordMessage);
log.info("优惠券锁定消息发送成功:{}",couponRecordMessage.toString());
}
```
优惠券消费MQ监听器开发
* 流程梳理
```java
优惠券解锁记录场景
1、超时未支付,比如30分钟则订单失效关闭
2、下单成功,创建订单业务失败,订单回滚
库存解锁防止继续支付:
1、30分支付超时则无法支付订单
2、订单31分延迟消息(比订单超时大几分钟)
->查询订单状态-向第三方支付查询订单状态,只有未支付状态,且本地订单状态是NEW,才修改本地订单状态为取消CANCEL,其他业务才可以解锁对应的库存库存
3、商品、优惠券库存32分延迟消息(比订单超时大几分钟)
->查询订单状态-订单不存在,解锁库存
->查询订单状态
1)订单状态为取消CANCEL的情况,才可以解锁库存,确认消息接收;
2)订单状态为未支付NEW的情况,则不解锁库存,不修改订单状态,重新投递消息或者拒收;
(避免网络延迟到 导致订单关单消息,比库存解锁还慢,没更新订单状态)
3)如果是订单已经支付则修改库存task工作单状态,确认消息接收;
注意:延迟队列一定要开启手动的ack机制,防止解锁失败,消息丢失,也要防止多次解锁
解锁库存的时候:修改状态和修改对应库存task工作单状态应该是同个事务,防止其中一个失败
```
监听
```java
@Slf4j
@Component
@RabbitListener(queues = "${mq.config.coupon_release_queue}")
public class CouponMQListener {
@Resource
private CouponRecordService couponRecordService;
// @Resource
// private RedissonClient redissonClient;
/**
* 重复消费-幂等性
*
* 消费失败,重新入队后最大重试次数:
* 如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查
*
* 消费者这块还有啥问题,大家可以先想下,然后给出解决方案
*/
@RabbitHandler
public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
//防止同个解锁任务并发进入;如果是串行消费不用加锁;加锁有利也有弊,看项目业务逻辑而定
//Lock lock = redissonClient.getLock("lock:coupon_record_release:"+recordMessage.getTaskId());
//lock.lock();
log.info("监听到消息:releaseCouponRecord消息内容:{}", recordMessage);
long msgTag = message.getMessageProperties().getDeliveryTag();
boolean flag = couponRecordService.releaseCouponRecord(recordMessage);
try {
if (flag) {
//确认消息消费成功
channel.basicAck(msgTag, false);
} else {
log.error("释放优惠券失败 flag=false,{}", recordMessage);
channel.basicReject(msgTag, true);
}
} catch (IOException e) {
log.error("释放优惠券记录异常:{},msg:{}", e, recordMessage);
channel.basicReject(msgTag, true);
}
// finally {
// lock.unlock();
// }
}
// @RabbitHandler
// public void releaseCouponRecord2(String msg,Message message, Channel channel) throws IOException {
// log.info(msg);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
// }
}
```
解锁优惠券记录
```java
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public boolean releaseCouponRecord(CouponRecordMessage recordMessage) {
//查询下task是否存
CouponTaskEntity task = couponTaskMapper.selectOne(new QueryWrapper().eq("id", recordMessage.getTaskId()));
if (task == null) {
log.warn("工作单不存,消息:{}", recordMessage);
return true;
}
//lock状态才处理
if (task.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())) {
//查询订单状态
JsonData jsonData = orderFeignService.queryProductOrderState(recordMessage.getOutTradeNo());
if (jsonData.getCode() == 0) {
//正常响应,判断订单状态
String state = jsonData.getData().toString();
if (ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)) {
//状态是NEW新建状态,则返回给消息队,列重新投递
log.warn("订单状态是NEW,返回给消息队列,重新投递:{}", recordMessage);
return false;
}
//如果是已经支付
if (ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)) {
//如果已经支付,修改task状态为finish
task.setLockState(StockTaskStateEnum.FINISH.name());
couponTaskMapper.update(task, new QueryWrapper().eq("id", recordMessage.getTaskId()));
log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}", recordMessage);
return true;
}
}
//订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW
log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW,message:{}", recordMessage);
task.setLockState(StockTaskStateEnum.CANCEL.name());
couponTaskMapper.update(task, new QueryWrapper().eq("id", recordMessage.getTaskId()));
//恢复优惠券记录是NEW状态
baseMapper.updateState(task.getCouponRecordId(), CouponStateEnum.NEW.name());
} else {
log.warn("工作单状态不是LOCK,state={},消息体={}", task.getLockState(), recordMessage);
}
return true;
}
```
```java
/**
* 更新优惠券使用记录
*/
void updateState(@Param("couponRecordId") Long couponRecordId, @Param("useState") String useState);
```
更新优惠券使用记录
```xml
update coupon_record
set use_state = #{useState}
where id = #{couponRecordId}
```
### 订单微服务-查询订单支付状态接口开发
查询订单支付状态接口开发(MQ里面没token,不需要登录)
controller
```java
@RequestMapping("/odr/product")
public class ProductOrderController {
@Resource
private ProductOrderService orderService;
/**
* 查询订单状态
*
* 此接口没有登录拦截,可以增加一个秘钥进行rpc通信
*/
@ApiOperation("查询订单状态")
@GetMapping("query_state")
public JsonData queryProductOrderState(@ApiParam("订单号") @RequestParam("out_trade_no")String outTradeNo){
String state = orderService.queryProductOrderState(outTradeNo);
return StringUtils.isBlank(state)?JsonData.buildResult(BizCodeEnum.ORDER_CONFIRM_NOT_EXIST):JsonData.buildSuccess(state);
}
}
```
Service
```javascript
@Override
public String queryProductOrderState(String outTradeNo) {
ProductOrderEntity productOrderDO = baseMapper.selectOne(new QueryWrapper().eq("out_trade_no", outTradeNo));
if (productOrderDO == null) {
return "";
} else {
return productOrderDO.getState();
}
}
```
### 数据一致性多场景用例测试-延迟消息消费释放优惠券功能开发
流程测试
* 写单元测试,发送消息
* 订单支付完成用例测试
* 订单超时未支付用例测试
* 订单异常不存在用例测试
* 消息延迟,监听消息处理
* 记录更新情况
* bug修复:useState变量名称
数据准备
```sql
INSERT INTO `product_order` (`id`, `out_trade_no`, `state`, `create_time`, `total_amount`, `pay_amount`, `pay_type`, `nickname`, `head_img`, `user_id`, `del`, `update_time`, `order_type`, `receiver_address`)
VALUES
(1, '123456abc', 'PAY', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0, NULL, NULL, NULL);
INSERT INTO `coupon_task` (`id`, `coupon_record_id`, `create_time`, `out_trade_no`, `lock_state`)
VALUES
(1, 1, '2021-02-27 16:05:11', '123456abc', 'LOCK');
```
* 功能流程完善
* 业务逻辑优化
* 如何保证消息不会重复消费-幂等处理
* 多个消息并发情况下是否需要加锁
* 是否还有其他问题,大家可以思考下,然后给出解决方案
```java
@Test
public void send() {
CouponRecordMessage message = new CouponRecordMessage();
message.setOutTradeNo("123456abc");
message.setTaskId(1L);
rabbitTemplate.convertAndSend("coupon.event.exchange", "coupon.release.delay.routing.key", message);
}
```