项目
博客
文档
归档
资源链接
关于我
项目
博客
文档
归档
资源链接
关于我
33| 高并发下商品库存扣减和释放解决方案设计
2024-08-19
·
·
原创
·
·
本文共 1,179个字,预计阅读需要 4分钟。
### 商品服务 -(下单)商品库存锁定模块设计和开发 - 商品锁定对象 ```java @ApiModel(value = "商品锁定对象",description = "商品锁定对象协议") @Data public class LockProductRequest { @ApiModelProperty(value = "订单id",example = "12312312312") @JsonProperty("order_out_trade_no") private String orderOutTradeNo; @ApiModelProperty(value = "订单项") @JsonProperty("order_item_list") private List
orderItemList; } @ApiModel(value = "商品子项") @Data public class OrderItemRequest { @ApiModelProperty(value = "商品id",example = "1") @JsonProperty("product_id") private long productId; @ApiModelProperty(value = "购买数量",example = "2") @JsonProperty("buy_num") private int buyNum; } ``` - controlle层接口开发 ```java @ApiOperation("rpc-锁定,商品库存锁定") @PostMapping("lock_products") public JsonData lockProducts(@ApiParam("商品库存锁定") @RequestBody LockProductRequest lockProductRequest){ return productService.lockProductStock(lockProductRequest); } ``` * service层开发 ```java /** * 锁定商品库存
* 1)遍历商品,锁定每个商品购买数量
* 2)每一次锁定的时候,都要发送延迟消息
**/ JsonData lockProductStock(LockProductRequest lockProductRequest); /** * 实现 **/ @Override public JsonData lockProductStock(LockProductRequest lockProductRequest) { String outTradeNo = lockProductRequest.getOrderOutTradeNo(); List
itemList = lockProductRequest.getOrderItemList(); //一行代码,提取对象里面的id并加入到集合里面 List
productIdList = itemList.stream().map(OrderItemRequest::getProductId).collect(Collectors.toList()); //批量查询 List
productVOList = this.findProductsByIdBatch(productIdList); //分组 Map
productMap = productVOList.stream().collect(Collectors.toMap(ProductVO::getId, Function.identity())); for(OrderItemRequest item:itemList){ //锁定商品记录 int rows = baseMapper.lockProductStock(item.getProductId(),item.getBuyNum()); if(rows != 1){ throw new BizException(BizCodeEnum.ORDER_CONFIRM_LOCK_PRODUCT_FAIL); }else { //插入商品product_task ProductVO productVO = productMap.get(item.getProductId()); ProductTaskEntity productTask = new ProductTaskEntity(); productTask.setBuyNum(item.getBuyNum()); productTask.setLockState(StockTaskStateEnum.LOCK.name()); productTask.setProductId(item.getProductId()); productTask.setProductName(productVO.getTitle()); productTask.setOutTradeNo(outTradeNo); productTaskMapper.insert(productTask); log.info("商品库存锁定-插入商品product_task成功:{}",productTask); // 发送MQ延迟消息,todo log.info("商品库存锁定信息延迟消息发送成功:{}",productMessage); } } return JsonData.buildSuccess(); } ``` 批量查询商品 ```java @Override public List
findProductsByIdBatch(List
productIdList) { return baseMapper.selectList(new QueryWrapper
().in("id", productIdList)) .stream().map(this::beanProcess).collect(Collectors.toList()); } ``` 锁定商品库存 ```java /** * 锁定商品库存 */ int lockProductStock(@Param("productId") long productId, @Param("buyNum") int buyNum); ``` ```xml
update product set lock_stock = lock_stock + #{buyNum} where id = #{productId} and stock - lock_stock>=#{buyNum}
``` ### 商品微服务- RabbitMQ延迟消息交换机和队列配置 ```yaml #自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue mq: config: #延迟队列,不能被监听消费 stock_release_delay_queue: stock.release.delay.queue #延迟队列的消息过期后转发的队列 stock_release_queue: stock.release.queue #交换机 stock_event_exchange: stock.event.exchange #进入延迟队列的路由key stock_release_delay_routing_key: stock.release.delay.routing.key #消息过期,进入释放队列的key stock_release_routing_key: stock.release.routing.key #消息过期时间,毫秒,测试改为15秒 ttl: 15000 ``` * 配置类 ```java @Configuration @Data public class RabbitMQConfig { /** * 交换机 */ @Value("${mq.config.stock_event_exchange}") private String eventExchange; /** * 第一个队列延迟队列, */ @Value("${mq.config.stock_release_delay_queue}") private String stockReleaseDelayQueue; /** * 第一个队列的路由key * 进入队列的路由key */ @Value("${mq.config.stock_release_delay_routing_key}") private String stockReleaseDelayRoutingKey; /** * 第二个队列,被监听恢复库存的队列 */ @Value("${mq.config.stock_release_queue}") private String stockReleaseQueue; /** * 第二个队列的路由key * * 即进入死信队列的路由key */ @Value("${mq.config.stock_release_routing_key}") private String stockReleaseRoutingKey; /** * 过期时间 */ @Value("${mq.config.ttl}") private Integer ttl; /** * 消息转换器 */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 创建交换机 Topic类型,也可以用dirct路由 * 一般一个微服务一个交换机 */ @Bean public Exchange stockEventExchange(){ return new TopicExchange(eventExchange,true,false); } /** * 延迟队列 */ @Bean public Queue stockReleaseDelayQueue(){ Map
args = new HashMap<>(3); args.put("x-message-ttl",ttl); args.put("x-dead-letter-exchange",eventExchange); args.put("x-dead-letter-routing-key",stockReleaseRoutingKey); return new Queue(stockReleaseDelayQueue,true,false,false,args); } /** * 死信队列,普通队列,用于被监听 */ @Bean public Queue stockReleaseQueue(){ return new Queue(stockReleaseQueue,true,false,false); } /** * 第一个队列,即延迟队列的绑定关系建立 */ @Bean public Binding stockReleaseDelayBinding(){ return new Binding(stockReleaseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,stockReleaseDelayRoutingKey,null); } /** * 死信队列绑定关系建立 */ @Bean public Binding stockReleaseBinding(){ return new Binding(stockReleaseQueue,Binding.DestinationType.QUEUE,eventExchange,stockReleaseRoutingKey,null); } } ``` ### 商品锁定-延迟消息功能测试开发和注意事项 **简介:延迟消息测试和注意事项** * 测试代码编写测试 ```java @RunWith(SpringRunner.class) @SpringBootTest(classes = ProductApplication.class) @Slf4j public class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void send(){ ProductMessage message = new ProductMessage(); message.setOutTradeNo("123456abc"); message.setTaskId(1L); rabbitTemplate.convertAndSend("stock.event.exchange", "stock.release.delay.routing.key", message); } } ``` 注意 * 只有微服务监听mq,才会创建对应的队列和交换机,所以编写测试方法或者写监听器就行 * IOC容器存在不行,RabbitMQ默认是懒加载模式 * 如果MQ已经存在对应的队列,则不会重新创建 * 修改配置后,需要删除队列重新建立生效 * 如果队列和交换机已经存在,重新启动项目会有错误警告,可以忽略 ### 商品库存锁定-发送延迟消息功能开发 * 消息协议介绍 ```java @Data public class ProductMessage { /** * 消息队列id */ private Long messageId; /** * 订单号 */ private String outTradeNo; /** * 库存锁定工作单id */ private Long taskId; } ``` 发送mq消息功能开发 ```java //发送MQ延迟消息 ,解锁商品库存 taskId TODO ProductMessage productMessage = new ProductMessage(); productMessage.setTaskId(productTaskDO.getId()); productMessage.setOutTradeNo(orderOutTradeNo); rabbitTemplate.convertAndSend(rabbitConfig.getEventExchange(),rabbitConfig.getStockReleaseDelayRoutingKey(),productMessage); log.info("商品库存锁定信息发送成功:{}",productMessage); ``` ### 商品微服务延迟消息消费-释放商品库存功能开发 * 商品微服务消费MQ监听器开发 ```java @Slf4j @Component @RabbitListener(queues = "${mq.config.stock_release_queue}") public class ProductStockMQListener { @Resource private ProductService productService; /** * * 重复消费-幂等性 * * 消费失败,重新入队后最大重试次数: * 如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查 * * 消费者这块还有啥问题,大家可以先想下,然后给出解决方案 */ @RabbitHandler public void releaseProductStock(ProductMessage productMessage, Message message, Channel channel) throws IOException { log.info("监听到消息:releaseProductStock消息内容:{}", productMessage); long msgTag = message.getMessageProperties().getDeliveryTag(); boolean flag = productService.releaseProductStock(productMessage); try { if (flag) { //确认消息消费成功 channel.basicAck(msgTag, false); }else { channel.basicReject(msgTag,true); log.error("释放商品库存失败 flag=false,{}",productMessage); } } catch (IOException e) { log.error("释放商品库存异常:{},msg:{}",e,productMessage); channel.basicReject(msgTag,true); } } } ``` - 释放商品库存 ```java boolean releaseProductStock(ProductMessage productMessage); ``` ```java @Override public boolean releaseProductStock(ProductMessage productMessage) { //查询工作单状态 ProductTaskEntity taskDO = productTaskMapper.selectOne(new QueryWrapper
().eq("id", productMessage.getTaskId())); if (taskDO == null) { log.warn("工作单不存在,消息体为:{}", productMessage); } //lock状态才处理 assert taskDO != null; if (taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())) { //查询订单状态 JsonData jsonData = orderFeignService.queryProductOrderState(productMessage.getOutTradeNo()); if (jsonData.getCode() == 0) { String state = jsonData.getData().toString(); if (ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)) { //状态是NEW新建状态,则返回给消息队,列重新投递 log.warn("订单状态是NEW,返回给消息队列,重新投递:{}", productMessage); return false; } //如果是已经支付 if (ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)) { //如果已经支付,修改task状态为finish taskDO.setLockState(StockTaskStateEnum.FINISH.name()); productTaskMapper.update(taskDO, new QueryWrapper
().eq("id", productMessage.getTaskId())); log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}", productMessage); return true; } } //订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复商品库存,message:{}", productMessage); taskDO.setLockState(StockTaskStateEnum.CANCEL.name()); productTaskMapper.update(taskDO, new QueryWrapper
().eq("id", productMessage.getTaskId())); //恢复商品库存,集锁定库存的值减去当前购买的值 baseMapper.unlockProductStock(taskDO.getProductId(), taskDO.getBuyNum()); } else { log.warn("工作单状态不是LOCK,state={},消息体={}", taskDO.getLockState(), productMessage); } return true; } ``` 跟锁定优惠券一样,Feign调用订单服务接口,查询订单状态 ```java @FeignClient(name = "nla-order-service") public interface OrderFeignService { /** * 查询订单状态 */ @GetMapping("/odr/product/v1/query_state") JsonData queryProductOrderState(@RequestParam("out_trade_no") String outTradeNo); } ``` 解锁商品存储 ```java void unlockProductStock(@Param("productId") Long productId, @Param("buyNum") Integer buyNum); ``` ```xml
update product set lock_stock = lock_stock-#{buyNum} where id = #{productId}
``` * 流程梳理 ```java 商品库存解锁记录场景 1、超时未支付,比如30分钟则订单失效关闭 2、下单成功,创建订单业务失败,订单回滚 库存解锁防止继续支付: 1、30分支付超时则无法支付订单 2、订单31分延迟消息(比订单超时大几分钟) ->查询订单状态-向第三方支付查询订单状态,只有未支付状态,且本地订单状态是NEW,才修改本地订单状态为取消CANCEL,其他业务才可以解锁对应的库存库存 3、商品、优惠券库存32分延迟消息(比订单超时大几分钟) ->查询订单状态-订单不存在,解锁库存 ->查询订单状态 1)订单状态为取消CANCEL的情况,才可以解锁库存,确认消息接收; 2)订单状态为未支付NEW的情况,则不解锁库存,不修改订单状态,重新投递消息或者拒收; (避免网络延迟到 导致订单关单消息,比库存解锁还慢,没更新订单状态) 3)如果是订单已经支付则修改库存task工作单状态,确认消息接收; 注意:延迟队列一定要开启手动的ack机制,防止解锁失败,消息丢失,也要防止多次解锁 解锁库存的时候:修改状态和修改对应库存task工作单状态应该是同个事务,防止其中一个失败 ``` ### 数据一致性多场景用例测试-延迟消息消费释放商品库存开发 流程测试 * 写单元测试,发送消息 * 订单支付完成用例测试 * 订单超时未支付用例测试 * 订单异常不存在用例测试 * 消息延迟,监听消息处理 * 记录更新情况 * 数据准备 ```sql INSERT INTO `product_order` (`id`, `out_trade_no`, `state`, `create_time`, `total_amount`, `pay_amount`, `pay_type`, `nickname`, `head_img`, `user_id`, `del`, `update_time`, `order_type`, `receiver_address`) VALUES (1, '123456abc', 'PAY', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0, NULL, NULL, NULL); INSERT INTO `product_task` (`id`, `product_id`, `buy_num`, `product_name`, `lock_state`, `out_trade_no`, `create_time`) VALUES (1, 1, 2, '杯子-小滴课堂', 'LOCK', '123456abc', '2021-02-28 16:01:52'); ``` * 功能流程完善 * 业务逻辑优化 * 如何保证消息不会重复消费-幂等处理 * 多个消息并发情况下是否需要加锁 * 是否还有其他问题,大家可以思考下,然后给出解决方案