Seata 框架自身并没有内置针对牢靠事情形式的处置打算,但咱们可以经常使用另一款曾经引见过的框架来成功这一指标,就是 RocketMQ。
RocketMQ 为开发人员提供了事务信息这一信息类型,专门用来应答散布式环境下的数据分歧性疑问。
事务信息是RocketMQ提供的一种初级信息类型,允许在散布式场景下信息消费和本地事务的最终分歧性。咱们可以区分从消费者和消费者维度登程来剖析牢靠事情成功上的需求。
事务信息的发生完美处置了牢靠事情形式口头环节中或许发生的疑问。事务信息提供了相似X/Open XA的散布事务配置,经过事务信息能到达散布式事务的最终分歧性。
那么,RocketMQ 是如何做到这一点的呢?关键就在于它所提供的半信息机制。
所谓半信息(Half Message),是指暂不能投递的信息。发送方曾经将信息成功发送到了服务端,但是服务端未收到消费者对该信息的二次确认,此时该信息被标志成暂不能投递形态,处于该种形态下的信息就是半信息。
引见完半信息的概念,咱们再来明白什么是半信息回查。
咱们知道由于网络闪断、消费者运行重启等要素,或许会造成某条事务信息的二次确认失落。RocketMQ 服务端经过扫描发现某条信息常年处于半信息形态时,就会主意向信息消费者征询该信息的最终形态(Commit 或 Rollback),这一环节就是半信息回查。图 1 展现了 RocketMQ 中事务信息的全体架构。
进一步,咱们梳理 RocketMQ 事务信息的口头环节,如图2所示。
可以看到,图 2 存在服务A和服务B这两个微服务。其中服务 A 是信息颁布者,而服务 B 是信息消费者,咱们须要确保两者之间数据的分歧性。这里有 7 个步骤。
图 2 更多是站在信息颁布者的角度看待事务信息的颁布流程。而针对信息消费而言,假设消费者处置事务信息时发生意外,RocketMQ 会启动重试操作,直到信息消费和本地事务处置都成功。这是一种回调机制,会被 RocketMQ 智能调用。
引见完 RocketMQ 事务信息的基本概念和口头流程之后,咱们接着引见它的开发形式。
当咱们在微服务架构中引入事务信息之前,须要创立一张事务口头记载表。事务口头记载表的作用有两个:一个是成功事务回查,另一个则是成功业务层幂等控制。
事务口头记载表的创立脚本如以下代码所示。
代码清单1 事务口头记载表 SQL 定义代码
CREATE TABLE `tx_record` (`tx_no` varchar(64) NOT NULL COMMENT '事务Id',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创立期间',PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事务记载表'
接上去咱们要引入 RocketMQ 内置的TransactionListener接口。
为了成功事务信息,开发人员的关键开发上班量就体如今对这个接口的成功环节中。TransactionListener接口的定义如下所示。
代码清单2 TransactionListener接口定义代码
public interface TransactionListener {//当发送事务信息成功之后,该方法会被触发,本地事务将被口头LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);//当没有收到事务信息的照应时,主机会发送确认信息来审核事务形态,该方法会被触发并失掉本地事务形态LocalTransactionState checkLocalTransaction(final MessageExt msg);}
可以看到,TransactionListener接口的两个方法区分成功了本地事务口头和本地事务回查这两个外围操作。那么咱们应该如何成功这两个方法呢?这里给出这两个方法的口头伪代码。
代码清单3 TransactionListener接口两个方法成功伪代码
executeLocalTransaction { 口头本地事务 假设失败就选用回滚事务,反之提交事务}checkLocalTransaction { 成功事务回查 依据事务口头记载判别,已口头则提交事务}
留意:这两个方法须要信息的颁布者来成功,但调用方是 RocketMQ 自身,而且这个调用环节是智能触发的,不须要开发做任何干预。
图 3 围绕信息颁布者展现了其所须要成功的各个外围步骤。
图3 事务信息中信息颁布者成功环节
假设咱们经常使用 Spring 框架来集成 RocketMQ,那么图 3 中的业务服务虚现类的成功环节可以参考如下代码示例。
代码清单3 信息颁布端业务服务虚现类示例代码。
@Servicepublic class CustomerTicketServiceImpl implements ICustomerTicketService {@AutowiredTxRecordMapper txRecordMapper;@AutowiredRocketMQTemplate rocketMQTemplate;@Overridepublic void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) {//从VO中创立TicketGeneratedEventTicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO);//将Event转化为JSON对象JSONObject jsonObject =new JSONObject();jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent);String jsonString = jsonObject.toJSONString();//生成信息对象Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送事务信息rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null);}@Override@Transactionalpublic void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) {//幂等判别if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){return ;}//拔出工单CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent);customerTicket.setStatus(1);save(customerTicket);//参与事务日志txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());}...}
上述代码展现的是一个拔出客服工单(CustomerTicket)的环节,generateTicket和doGenerateTicket方法区分对应图 3 中的发送信息和口头本地事务这两个环节。
留意:这里经常使用了RocketMQTemplate的sendMessageInTransaction方法来发送事务信息。同时,咱们也看到了事务口头记载表的一种运行场景,即成功业务层幂等控制。
接上去继续成功图3所示的TransactionListener接口,示例代码如下:
代码清单4 TransactionListener接口成功类示例代码。
@Component@RocketMQTransactionListener(txProducerGroup = "producer_group_ticket")public class ProducerListener implements RocketMQLocalTransactionListener {@AutowiredICustomerTicketService customerTicketService;@AutowiredTxRecordMapper txRecordMapper;//事务信息发送后的回调方法,当信息发送给MQ成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析信息,转成Event对象TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);//口头本地事务customerTicketService.doGenerateTicket(ticketGeneratedEvent);//往RocketMQLocalTransactionState.COMMIT,智能向MQ发送commit信息,MQ将信息的形态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();//假设本地事务口头失败,就将信息设置为回滚形态return RocketMQLocalTransactionState.ROLLBACK;}}//事务形态回查@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析信息,转成Event对象TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);//依据事务Id判别能否存在已口头的事务Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()));//假设事务已口头则前往COMMIT,反之前往UNKNOWN形态if(isTxNoExisted){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}...}
这段代码明晰地展现了TransactionListener接口中两个外围方法的成功环节。在executeLocalTransaction方法中,咱们经过调用CustomerTicketService业务服务类的doGenerateTicket方法成功了本地事务;而在checkLocalTransaction方法中,咱们则成功了事务回查机制。这里雷同展现了事务口头记载表的另一种运行场景,即成功事务回查。
相似,当经常使用事务信息时,信息消费者的成功环节雷同遵照必定的开发规范,如图 4 所示。
图4 事务信息中信息消费者成功环节
可以看到,相比于信息颁布者,信息消费者的成功环节要便捷很多。
代码清单5 信息消费成功类示例代码。
@Component@RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket")public class Consumer implements RocketMQListener<String> {@AutowiredIChatRecordService chatRecordService;//接纳信息@Overridepublic void onMessage(String message) {log.info("开局消费信息:{}",message);//解析信息JSONObject jsonObject = JSONObject.parseObject(message);String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent");//转成TicketGeneratedEventTicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class);//参与本地聊天记载chatRecordService.generateChatRecord(ticketGeneratedEvent);}}
可以看到,这个信息消费者的成功环节没有任何不凡之处,咱们只要要成功RocketMQListener接口的onMessage方法,并在该方法中调用业务服务虚现类中的业务方法即可。
消费者端的业务服务虚现类的成功环节如下。
代码清单6 信息消费端业务服务虚现类示例代码:
@Servicepublic class ChatRecordServiceImpl implements IChatRecordService {@AutowiredTxRecordMapper txRecordMapper;@Override@Transactionalpublic void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) {//幂等判别if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){return ;}//拔出聊天记载ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent);save(chatRecord);//参与事务日志txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());}}
这里雷同经过事务口头记载表成功了业务层幂等控制,并最终成功本地事务的提交。
作为总结,咱们经常使用时序图来具体展现事务信息发送和消费环节,如图 5 所示。
图5 事务信息发送和消费时序图
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载联系作者并注明出处:https://duobeib.com/diannaowangluoweixiu/6403.html