概念: 半消息: 在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知rocketmq发送操作步骤 :1.(生产者)发送-【半消息】2.(生产者)本地监听-【半消息】处理结果3.(消费者)处理-【半消息】

1.(生产者)发送-【半消息】

// 消息体@Data@Builder@ToStringpublic class UserMoneyParams { int userId; String act; double money; String info; String infoParams;}// 发送消息// 发送-队列半消息: rocketMQ@RequestMapping("rocketMQHalf")public ApiResult rocketMQHalf() { int orderId = 2; double money = 10; // 用户余额变更-参数体 UserMoneyParams userMoneyParams = UserMoneyParams.builder() .act("pay-order") .userId(orderId) .money(money) .build(); // 用户数据变更-参数 UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1); log.info("发送前参数: "+userMoneyParams.toString()); rocketMQTemplate.sendMessageInTransaction( // 半消息-分组 "tsca-group-half", // 半消息-topic "member-change-money-half-topic", // 半消息-数据体 MessageBuilder .withPayload(userMoneyParams) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID()) .build(), userOrder ); return ApiResult.success("发送队列-半消息");}

2.(生产者)本地监听-【半消息】处理结果

@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")@RequiredArgsConstructor@Slf4jpublic class UserMoneyHalfListener implements RocketMQLocalTransactionListener { @Autowired RedisUtil redisUtil; @Autowired UserOrderService userOrderService; // 生产者-消息处理完毕,继续执行本地方法(含事务) @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try { Object userMoneyParams=message.getPayload(); log.info("消息-args:"+arg); // 消息主体加密无法获取 log.info("消息-主体:"+ JSON.toJSONString(userMoneyParams)); log.info("消息-主体-头部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID)); log.info("半消息-本地-处理完成"); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.warn("半消息-本地-发生异常,回滚: "+e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } // 生产者-消息处理超时 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 查询消息是否已经处理 String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id")); Object messageData = this.redisUtil.getValue(messageID, String.class); if (messageData != null && messageData.equals("ok")) { // 超时且消息已经处理完毕 log.info("半消息-本地消息超时-且已经处理完毕"); return RocketMQLocalTransactionState.COMMIT; } else { log.info("半消息-本地消息超时-且未处理完毕"); // 超时且消息未处理完毕 return RocketMQLocalTransactionState.ROLLBACK; } }}

3.(消费者)处理-【半消息】

@Service@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")@Slf4jpublic class UserMoneyHalfListener implements RocketMQListener<UserMoneyParams> {// @Autowired// UserMoneyService memberOrderService; @Override public void onMessage(UserMoneyParams memberMoneyMessage) { log.info("收到-用户余额变动-半消息"); try { } catch (Exception e) { log.info("更改余额错误: "+e.getMessage()); e.printStackTrace(); } log.info(JSON.toJSONString(memberMoneyMessage)); }}