随着业务规模的不时扩张和技术架构的演进,散布式系统曾经成为撑持高并发、海量数据处置的关键基础设备。在散布式环境中,各个节点相对独立且或许并发地口头义务,这极大地优化了系统的全体性能和可用性。当触及到对共享资源的访问和修正时,为了确保数据的分歧性和正确性,咱们须要一种能在多节点间协调并发操作的技术手腕,也就是散布式锁。
传统的单机环境下,进程内可以经过本地锁轻松成功对临界区资源的互斥访问。但是,这一方法在散布式系统中不再实用,由于单机锁不可超过网络边界,不可保障不同节点间的并发控制。散布式锁正是在这种背景下发生,它是一种能够实如今散布式系统中多个节点之间协同上班的锁机制,旨在包全共享资源不受并发抵触的影响,确保在复杂的散布式场景下数据操作的有序性和分歧性。
咱们以WMS系统中,订单出入库操作库存为例。
CREATE TABLE `tb_inventory`(`id`BIGINTNOT NULL AUTO_INCREMENT,`account_id`BIGINTNOT NULL DEFAULT 0 COMMENT '帐套ID',`sku`VARCHAR(128)NOT NULL DEFAULT '' COMMENT '商品sku编码',`warehouse_code`VARCHAR(16)NOT NULL DEFAULT '' COMMENT '库存编码',`available_inventory` INT UNSIGNEDNOT NULL DEFAULT 0 COMMENT '可用库存',`create_time`DATETIMENOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创立期间',`update_time`DATETIMENOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修正期间',`deleted`TINYINT UNSIGNED NULLDEFAULT 0 COMMENT '0-未删除 1/null-已删除',PRIMARY KEY (`id`) USING BTREE,UNIQUE KEY uk_warehouse_code (customer_no, warehouse_code, sku, deleted)) ENGINE = InnoDBAUTO_INCREMENT = 1CHARACTER SET = utf8mb4 COMMENT = '库存表';
关于操作库存,经常出现有以下一些失误做法:
间接在内存中判别能否有库存,计算扣减之后的值更新数据库,并发的状况下会造成库存相互笼罩发。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 疏忽 订单消息校验等,,,// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 残余库存Integer remainInventory = availableInventory - qty;// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);updateInventory.setAvailableInventory(remainInventory);tbInventoryMapper.updateInventory(updateInventory);}
sql中间接口头更新库存
<update>UPDATE tb_inventorySET available_inventory = #{availableInventory}WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0</update>
库存SKU的库存曾经变成了正数:
在InnoDB存储引擎下,UPDATE通常会运行行锁,所以在SQL中参与运算防止值的相互笼罩,但是库存的数量还是或许变为正数。由于校验库存能否短缺在内存中口头,并发状况下都会读到有库存。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 疏忽 订单消息校验等,,,// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
库存扣减在sql中启动
<update>UPDATE tb_inventorySET available_inventory = available_inventory - #{diffInventory}WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0</update>
库存SKU的库存曾经变成了正数:
只管synchronized可以防止在多并发环境下,多个线程并发访问这个库存操作方法,但是synchronized的作用在方法完结之后就失效了,或许此时势务并没有提交,造成或许其余的线程会在拿到锁之后读取到旧库存数据,在口头扣除时,依然或许会形成库存扣减不对。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 疏忽 订单消息校验等,,,// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
库存SKU的库存曾经变成了正数:
从下面的失误案例来看,在操作库存时,不是原子性的,造成库存操作失败。以下咱们从单体以及散布式系统两个方向讨论如何保障数据的分歧性和正确性。
在单机系统中,数据和业务逻辑都集中在一个进程中,面对并发访问共享资源的状况,须要依托锁机制和数据库的事务治理(行锁)来保养数据的正确性和分歧性。
关于锁机制,咱们不论是驳回synchronized还是Lock等,咱们要保障的一个条件就是:要让数据库的事务在锁的控制范畴之内。
针对上述失误案例,咱们可以将锁作用于事务之外,行将锁放在库存操作方法的上一层(例如service层)。
@Servicepublic class OrderServiceImpl implements IOrderService {private IOrderManager orderManager;/*** 确认订单出库** @param customerNo* @param orderNo*/@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {orderManager.confirmOrder(customerNo, orderNo);}@Autowiredpublic void setOrderManager(IOrderManager orderManager) {this.orderManager = orderManager;}}
此时咱们在操作库存,会由于库存不够,造成库存操作失败:
这种模式只管可以成功数据分歧性和正确性,但是并不是很介绍,由于咱们的事务要控制的粒度尽或许的小。
介绍的模式,是咱们再锁的控制范畴去提交事务。即手动提交事务。经常使用TransactionTemplate或间接在代码中调用PlatformTransactionManager的getTransaction和commit方法来手动治理事务。
@Autowiredprivate PlatformTransactionManager transactionManager;/*** 确认订单出库** @param customerNo* @param orderNo*/@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());// 疏忽 订单消息校验等,,,// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);// 提交事务transactionManager.commit(status);}
此时咱们再去执库存操作,会由于库存不够,造成库存操作失败:
关于上述同步锁的成功,咱们最好经常使用Lock得模式去成功,可以更精细控制同步逻辑。
@Autowiredprivate PlatformTransactionManager transactionManager;private final Lock orderLock = new ReentrantLock();/** * 确认订单出库 * * @param customerNo * @param orderNo */@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();try {// 尝试失掉锁,最多期待timeout期间if (orderLock.tryLock(1, TimeUnit.SECONDS)) {// 成功失掉到锁,口头确认订单的逻辑TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 疏忽 订单消息校验等,,,// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);// 提交事务transactionManager.commit(status);}catch (Exception e){// 回滚事务transactionManager.rollback(status);// 处置异常e.printStackTrace();}finally {// 监禁锁orderLock.unlock();}} else {// 失掉锁超时System.out.println("Failed to confirm order within the timeout period: " +orderNo);// 处置超时状况,比如记载日志、通知用户等}} catch (InterruptedException e) {// 假设在期待锁的环节中线程被终止,处置终止异常Thread.currentThread().interrupt();// ... 处置终止逻辑 ...}}
在单机系统中,上述方法可以保障数据分歧性以及正确性,但是实践业务中,咱们运行通常都部署在多个主机中,此时上述打算就不能保障了,就须要散布式锁来处置了。
在单机系统中,锁是一种基本的同步机制,用于控制多个线程对共享资源的并发访问。当咱们更新到散布式系统时,由于服务扩散在多个节点之上,原本在单机环境下经常使用的锁机制不可间接超过多个节点来协调资源访问。所以此时,散布式锁作为一种裁减的锁概念应运而生。散布式锁是一种跨多个节点、进程或服务的同步原语,它准许在散布式系统中协调对共享资源的访问,确保在任何时刻只要一个节点能够独占地口头操作,即使这些节点散布在不同的物理或虚构机器上。
1.互斥性: 这是散布式锁最基本的要求,象征着在恣意时辰,只要一个客户端(无论是进程、线程还是服务虚例)能够持有并经常使用锁,从而确保共享资源不会同时被多个客户端修正。
2.耐久性: 散布式锁必定具有必定的耐久化才干,即使服务重启或网络持久断开,锁的形态依然能够失掉坚持。
3.可重入性:相似于单机环境下的可重入锁,散布式锁也应该支持同一客户端在持有锁的同时再次恳求锁而不被阻塞,这关于递归调用或触及多个资源访问的操作至关关键。
4.偏心性(Fairness): 在某些场景下,要求锁调配遵照必定的偏心准则,即期待最久的客户端在锁监禁时优先取得锁。只管不是一切散布式锁成功都须要思考偏心性,但在某些高性能或高并发的系统中,偏心性是十分关键的。
5.容错性:散布式锁服务应当具有必定的容错才干,即即使一局部服务节点出现缺点,仍能保障锁配置的正确运转,防止死锁和数据不分歧。这通常经过服务冗余和复制机制来成功,如经常使用Raft、Paxos等分歧性协定或基于ZooKeeper、etcd等散布式协调服务。
失望锁以预防性战略处置并发抵触,它假定并发访问造成的数据抵触是常态。因此,在访问数据之前,它会踊跃地失掉并持有锁,确保在锁未监禁时,其余事务不可对同一数据启动访问。经过运用SELECT ... FOR UPDATESQL语句,能够在查问阶段即锁定关系行,成功数据的独占访问。但是,关键的是要留意,此操作应仅针对惟一键口头,否则或许会大幅参与锁定范畴和潜在的锁表危险,从而影响系统的并发性能与效率。
最经常出现的做法是间接在业务数据上经常使用SELECT ... FOR UPDATE,例如:
<select resultType="com.springboot.mybatis.entity.TbInventoryDO">SELECT *FROM tb_inventoryWHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0FOR UPDATE</select>
在一个事务中,先经常使用SELECT ... FOR UPDATE后,在口头更新。
/*** 经常使用SELECT... FOR UPDATE 成功散布式锁,扣减库存* @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithLock(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
但是,这种成功模式,很容易形成业务表的锁压力,特地是数据量大,并发量高的时刻。所以,还有一种做法是,专门保养一张锁的表,而不是间接在业务数据表上经常使用SELECT FOR UPDATE。这种模式在某些场景下可以协助简化锁的治理,并且可以在必定水平上减轻对业务数据表的锁定压力。(其实成功模式,相似Redis成功的散布式锁,只是用数据库成功了而已)。其成功流程,如下:
数据库成功失望锁流程
1.创立锁表:首先,创立一张锁表,例如lock_table,蕴含lock_key(用于标识须要锁定的业务资源)、lock_holder(持有锁的客户端标识,如用户ID或事务ID)、acquire_time(失掉锁的期间)等字段。
CREATE TABLE `tb_lock`(idBIGINT AUTO_INCREMENTPRIMARY KEY,lock_keyVARCHAR(255)NOT NULL DEFAULT '' COMMENT '锁的业务编码。对应业务表的惟一键',lock_holderVARCHAR(32)NOT NULL DEFAULT '' COMMENT '持有锁的客户端标识',acquire_time DATETIMENOT NULL COMMENT '失掉锁的期间',create_timeDATETIMEDEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '创立期间',update_timeDATETIMEDEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修正期间',deletedTINYINT UNSIGNED DEFAULT '0'NULL COMMENT '0-未删除 1/null-已删除',UNIQUE KEY uk_lock (lock_key, deleted)) ENGINE = InnoDBAUTO_INCREMENT = 1CHARACTER SET = utf8mb4 COMMENT = 'Lock表';
<insert>INSERT INTO tb_lock(lock_key,lock_holder,acquire_time)VALUES(#{lockKey},#{lockHolder},#{acquireTime})</insert>
<select resultType="com.springboot.mybatis.entity.TbLockDO">SELECT *FROM tb_lockWHERE lock_key = #{lockKey} AND deleted = 0FOR UPDATE</select>
4. 审核锁形态:在失掉锁时,可以审核锁能否已被持有,比如审核lock_holder字段,假设已有其余事务持有锁,则失掉锁失败,须要期待或重试。
// 尝试失掉锁tryLock(lockKey, lockHolder);// 经常使用SELECT FOR UPDATE锁定锁表记载TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);if (!tbLockDO.getLockHolder().equals(lockHolder)) {// 锁已被其余客户端持有,失掉锁失败,须要处置此异常状况throw new IllegalStateException("Lock is held by another client.");}
<delete parameterType="java.lang.String">DELETE FROM tb_lockWHERE lock_key = #{lockKey}AND lock_holder = #{lockHolder}AND deleted = 0</delete>
基于数据库失望锁成功,代码如下:
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithLock(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);String lockHolder = Thread.currentThread().getName();try {// 尝试失掉锁tryLock(lockKey, lockHolder);// 经常使用SELECT FOR UPDATE锁定锁表记载TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);if (!tbLockDO.getLockHolder().equals(lockHolder)) {// 锁已被其余客户端持有,失掉锁失败,须要处置此异常状况throw new IllegalStateException("Lock is held by another client.");}// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}finally {unlock(lockKey, lockHolder);}}/*** 尝试失掉锁* @param lockKey 锁的key 业务编码* @param lockHolder 锁的持有者* @return 能否失掉成功*/private void tryLock(String lockKey, String lockHolder) {TbLockDO tbLockDO = new TbLockDO();tbLockDO.setLockKey(lockKey);tbLockDO.setLockHolder(lockHolder);tbLockDO.setAcquireTime(LocalDateTime.now());//拔出一条数据insert intotbLockMapper.insertLock(tbLockDO);}/*** 锁监禁* @param lockKey 锁的key 业务编码*/private void unlock(String lockKey, String lockHolder){tbLockMapper.deleteLockByLockKey(lockKey, lockHolder);}
数据库失望锁成功散布式锁可以防止并发抵触,确保在事务完结前,这些记载不会被其余并发事务修正。它还可以控制锁的粒度,提供行级别的锁定,缩小锁定范畴,提高并发性能。这种模式十分适宜于处置须要更新的事务场景,特地是银行转账、库存扣减等须要保障数据完整性和分歧性的操作。
但是,须要留意的是,适度或不当经常使用SELECT FOR UPDATE会造成更多的行被锁定,在高并发场景下,假设少量事务都在期待失掉锁,或许会造成锁期待和死锁疑问,并且当事务持有SELECT FOR UPDATE的锁时,其余事务尝试修正这些锁定的行会堕入期待形态,直至锁监禁。这或许造成其余事务的提早和系统吞吐量降低,长期间持有锁会造成数据库资源(如内存、衔接数等)消耗增大,特地是长事务中持有锁期间较长,会影响系统的总体性能。所以咱们在经常使用时要特地留意不要再长事务中经常使用失望锁。
失望锁假定并发抵触不太或许出现,因此在读取数据时不锁定资源,而是在更新数据时验证数据能否被其余事务修正过。
在数据库表中参与一个version字段。
ALTER TABLE `tb_inventory` ADD COLUMN `version` INT NOT NULL DEFAULT 0 COMMENT '失望锁版本' AFTER available_inventory;
每次更新时将version字段加1。在更新数据时,经过UPDATE语句附带WHERE version = oldVersion条件,只要当version值不变时更新操作才会成功。若version已变,则示意数据已被其余事务修正,此次更新失败。
<update>UPDATE tb_inventorySET available_inventory = available_inventory - #{diffInventory},version = #{version} + 1WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND version = #{version}AND deleted = 0</update>
基于失望锁成功的打算:
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithVersion(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();Integer curVersion = inventoryDO.getVersion();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 设置数据版本号updateInventory.setVersion(curVersion);// 库存差值updateInventory.setDiffInventory(qty);updateInventory.setVersion(inventoryDO.getVersion());int updateRows = tbInventoryMapper.updateInventorWithVersion(updateInventory);if (updateRows != 1){System.err.println("更新库存时出现并发抵触,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时出现并发抵触,请重试");}}
失望锁假定大少数状况下不会有并发抵触,所以在读取数据时不立刻加锁,而是等到更新数据时才去审核能否有其余事务启动了改变,这样可以缩小锁的持有期间,提高了系统的并发性能。并且,失望锁在数据更新时才审核抵触,而不是在失掉数据时就加锁,所以大大降低了死锁的危险。并且由于不常加锁,所以缩小了数据库级别的锁治理开支,十分适宜关于读多写少的场景。
但是,当并发写入较多时,或许出现少量更新抵触,须要不时地重试事务以取得成功的更新。过多的重试或许造成性能降低,特地是在并发度极高时,或许会构成“ABA”疑问。并且 在极其并发条件下,假设没有正确的重试机制或超机遇制,失望锁或许不可保障强分歧性。尤其是在触及多个表的复杂事务中,单个失望锁或许无余以处置一切并发疑问。
Redis的setNX(set if not exists)命令是原子操作,当键不存在时才设置值,设置成功则前往true,否则前往false。经过这个命令可以极速地在Redis中争夺一把锁。
应用Redis,咱们可以生成一个惟一的锁ID作为key的一局部。而后经常使用setNX尝试设置key-value对,value可以是过时期间戳。若设置成功,则以为失掉锁成功,口头业务逻辑。在业务逻辑成功后,删除对应key监禁锁,或设置过时期间智能监禁。
@Slf4jpublic class RedisDistributedLock implements AutoCloseable{private final StringRedisTemplate stringRedisTemplate;private final DefaultRedisScript<Boolean> unlockScript;/**锁的key*/private final String lockKey;/**锁过时期间*/private final Integer expireTime;private static final String UNLOCK_LUA_SCRIPT = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +"return redis.call(\"del\", KEYS[1])\n" +"else\n" +"return 0\n" +"end";public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, Integer expireTime) {this.stringRedisTemplate = stringRedisTemplate;this.lockKey = lockKey;this.expireTime = expireTime;// 初始化Lua解锁脚本this.unlockScript = new DefaultRedisScript<>();unlockScript.setScriptText(UNLOCK_LUA_SCRIPT);unlockScript.setResultType(Boolean.class);}/*** 失掉锁* @return 能否失掉成功*/public Boolean getLock() {String value = UUID.randomUUID().toString();try {return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);} catch (Exception e) {log.error("失掉散布式锁失败: {}", e.getMessage());return false;}}/*** 监禁锁* @return 能否监禁成功*/public Boolean unLock() {// 经常使用Lua脚本启动解锁操作List<String> keys = Collections.singletonList(lockKey);Object result = stringRedisTemplate.execute(unlockScript, keys, stringRedisTemplate.opsForValue().get(lockKey));boolean unlocked = (Boolean) result;log.info("监禁锁的结果: {}", unlocked);return unlocked;}@Overridepublic void close() throws Exception {unLock();}}
而后,咱们在处置库存时,先尝试失掉锁,假设失掉到锁,则就可以更新库存。
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithRedisNx(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过时try (RedisDistributedLock lock = new RedisDistributedLock(stringRedisTemplate, lockKey, 30)) {if (lock.getLock()) {// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);} else {log.error("更新库存时出现并发抵触,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时出现并发抵触,请重试");}} catch (Exception e) {log.error("处置散布式锁时出现失误: {}", e.getMessage());}}
Redis作为内存数据库,其操作速度快,setNX的口头期间简直可以疏忽不计,尤其适宜高并发场景下的锁恳求。Redis作为一个可以独立的服务,可以轻松成功不同进程或主机之间的互斥锁。而setNX命令是原子操作,能够在Redis这一复线程环境下以原子性的模式成功锁的失掉,方便一行命令即可成功锁的争抢。同时可以经过EX或PX参数,可以在设置锁时一并设定过时期间,防止因异常状况造成的死锁。
但是单纯经常使用setNX并不能智能续期,一旦锁过时而又未被动监禁,或许出现锁被其余客户端误失掉的状况,须要额外成功锁的智能续期机制,例如经常使用WATCH和MULTI命令组合,或许SET命令的新参数如SET key value PX milliseconds NX XX。而setNX在失掉不到锁时会立刻前往失败,所以咱们必定轮询或经常使用某种延时重试战略来不时尝试失掉锁。并且假设多个客户端同时恳求锁,Redis并不会保障特定的排队顺序,或许造成“饥饿”现象(即某些客户端一直不可失掉锁)。
只管Redis的setNX命令在成功散布式锁方面提供了方便性和高性能,但要构建强健、牢靠的散布式锁处置打算,往往还须要联合其余命令(如expire、watch、multi/exec等)以及思考到各种边缘状况和容错机制。一些成熟的Redis客户端库(如Redisson、Jedis)提供了封装好的散布式锁成功,处置了上述许多疑问。
Redisson是一个高性能、开源的Java驻内存数据网格,它基于Redis,并提供了泛滥散布式数据结构和一套散布式服务,例如散布式锁、信号量、闭锁、队列、映射等。Redisson使得开发者能够更容易地在Java运行程序中经常使用Redis,特地是对散布式环境下的同步原语提供了丰盛的API支持。
Redisson的散布式锁外围原理基于Redis命令,但启动了增强和封装,提供了一种愈加牢靠和易于经常使用的散布式锁成功。他成功散布式锁的思绪与Redis的setNx成功相似。但是,相比拟与Redis的setNx成功散布式锁,Redisson还支持可重入锁,即同一个线程在曾经取得锁的状况下可以再次失掉锁而不被阻塞。外部经过计数器记载持有锁的次数,每次成功失掉锁时计数器递增,监禁锁时递减,只要当计数器归零时才真正监禁锁。Redisson经常使用了看门狗(Watchdog)机制来监控锁的形态,活期智能延伸锁的有效期,这样即使持有锁的客户端暂时解冻或网络颤抖,锁也不会由于超时而被提早监禁。并且,关于Redis集群,Redisson还可以成功RedLock算法,经过在多个Redis节点上区分失掉锁,参与散布式锁的可用性和容错才干。
咱们经常使用Redisson成功散布式锁,成功库存扣减。
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.7</version></dependency>
spring:redisson:address: "redis://127.0.0.1:6379"password:
@Overridepublic void confirmOrderWithRedisson(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过时RLock lock = redissonClient.getLock(lockKey);try {if (lock.tryLock(30, TimeUnit.SECONDS)) {// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);} else {log.error("更新库存时出现并发抵触,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时出现并发抵触,请重试");}}catch (Exception e){throw new ServiceException(StatusEnum.SERVICE_ERROR, "失掉散布式锁时被终止");}finally {// 无论成功与否,都要监禁锁if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}}
Redisson支持多种类型的散布式锁,包括可重入锁(RLock)、读写锁(RReadWriteLock)、偏心锁(RFairLock)等,满足不同业务场景的需求。Redisson支持锁的智能续期配置,可以防止由于锁持有者在业务处置环节中长期间未成功而造成锁过时被其余客户端失掉。关于Redisson RedLock算法(多节点部署时),即使局部Redis节点失效,也能在大少数Redis节点存活的状况下维持锁的稳固性,增强了系统的容错性和高可用性。
相较于方便的数据库失望锁,Redisson的散布式锁成功更为复杂。只管Redisson提供了智能续期机制,但假设客户端在失掉锁后突然解体且没有反常监禁锁,通常上依然有或许造成锁走漏。只管Redisson也提供了超时设置,但极其状况下仍需结人工清算机制或许其余的打算来预防此类疑问。
在Zookeeper中成功散布式锁的基本原理是应用Zookeeper的暂季节点和Watcher监听机制。
客户端在Zookeeper中指定的某个门路下创立暂时有序节点,每个节点称号后都会附加一个惟一的递增数字,示意节点的顺序。当多个客户端同时恳求锁时,它们都会创立各自的暂时有序节点。
客户端依照节点顺序判别自己能否可以取得锁。节点顺序最小的客户端被以为是锁的持有者,它观察到的序号比自己大的一切节点都是待解锁的队列。锁的持有者继续口头业务逻辑,其它客户端则会注册Watcher监听比自己序号小的那个节点。
当锁持有者成功业务处置后,会删除它创立的暂季节点,Zookeeper会触发Watcher通知期待队列中的下一个节点。接纳到通知的下一个节点发现其观察的节点已删除,于是从新审核门路下残余节点的顺序,假设自己是如今最小的节点,则以为取得了锁。
Watcher机制准许客户端监听Zookeeper上的节点变动事情,当节点被创立、删除、更新时,Zookeeper会向注册了相应事情的客户端发送通知。在散布式锁场景中,客户端经过注册Watcher来监听锁持有者的节点形态,以便在锁监禁时及时失掉锁。
而咱们经常使用Apache Curator框架作为Zookeeper客户端成功散布式锁。Curator领有良好的架构设计,提供了丰盛的recipes(即预制模板)来成功经常出现的散布式协调义务,包括共享锁、互斥锁、屏障、Leader选举等。Curator的散布式锁成功如InterProcessMutex和InterProcessSemaphoreMutex,间接提供了易于经常使用的API来失掉和监禁锁。
Curator在成功散布式锁时,充沛思考了ZooKeeper的个性,比如暂季节点的生命周期关联会话、有序节点的排序机制以及Watcher事情的通知机制等,确保在各种异常状况下,锁的行为合乎预期,例如客户端断线后锁能被正确监禁。
Curator外部集成了重试战略和背压控制,当ZooKeeper操作遇到网络提早或持久的ZooKeeper集群不稳固时,Curator能够智能启动重试,而不是立刻抛出异常。
@Componentpublic class ZkLock {private final CuratorFramework client;private final InterProcessMutex lock;@Value("${curator.zookeeper.connect-string}")private String zookeeperConnectString;public ZkLock() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.newClient(zookeeperConnectString, retryPolicy);client.start();// 散布式锁门路String lockPath = "/locks/product_stock";lock = new InterProcessMutex(client, lockPath);}public void acquireLock(Runnable task) throws Exception {// 尝试失掉锁,超时期间为30秒if (lock.acquire(30, TimeUnit.SECONDS)) {try {task.run();// 在持有锁的状况下口头义务} finally {lock.release();// 无论能否出现异常,都要确保监禁锁}} else {throw new Exception("失掉散布是锁失败");}}}
经常使用ZkLock:
@Overridepublic void confirmOrderWithZk(String customerNo, String orderNo) {// 查问订单消息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查问订单明细假定咱们的出库订单是一繁多件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过时zkLock.acquireLock(() -> {// 查问库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判别库存能否足够if (qty > availableInventory){System.err.println("库存无余,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存无余,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);});}
Apache Curator成功的散布式锁实用于须要在散布式环境中成功强分歧性和高牢靠性的并发控制场景,但是它对ZooKeeper的依赖就触及到了一些网络开支以及运维复杂性等方面的缺陷。
散布式锁是一种在散布式系统中成功互斥控制的机制,确保在多台机器间,某一资源在同一时辰只被一个服务或许一个恳求所访问或修正。它的外围应战在于如何保障在无中心化环境下的全局惟一性和分歧性。
其成功关键依赖散布式存储系统或协调服务。经常出现的成功模式有如下几种模式:
而实践业务开发中,咱们须要依据详细的业务以及系统资源等思考,选用适宜的散布式锁成功模式。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载联系作者并注明出处:https://duobeib.com/diannaowangluoweixiu/8612.html