概念

在交易系统中,通常存在流水号的概念,在生成交易订单时,会通过分片+流水号的形式记录一个全局唯一的委托号。

对于流水号的获取,通常有如下要求:

  • 流水号本身需要以数字形式进行递增
    • 防止作为主键或者索引造成页分裂,同时在排查问题时,在使用数字流水号的场景里追溯问题通常易于使用其他流水号,如UUID、雪花算法等。
  • 流水号由计数器统一分配
    • 流水号通常从一个地方获取,即交易系统通常会实现提供统一流水号的计数器,确保流水号的全局唯一,同时获取流水号的出口也唯一,易于维护。
  • 流水号的设计需要按业务维度划分
    • 日间交易存在多类业务,但每一类业务的频率往往不同,比如日间普通交易、信用交易的业务非常频繁,而大宗交易等综合业务往往发生频次非常低(比如一个交易日内发生几十次),这就使得不同业务流水号的增长速率不同,如果都使用同一类计数器进行维护,那么流水号的资源很快就会被频繁发生交易业务耗尽。
  • 获取流水号需要尽可能的做到开销小
    • 即流水号需要以最直接简单的方式获取,来确保交易功能的高性能要求。

交易系统中的分布式事务

通常采用TCC(try-confirm-cancel)的模式实现跨数据库事务调度,拿生成委托的过程来举个例子,一个普通委托记录生成的流程通常如下:

  • try阶段(一阶段)
    • 获取当前业务全局唯一序列号
    • 生成当前业务委托订单(此时委托订单的状态字段可能为未生效,即视作预委托)
  • confirm阶段(二阶段提交),一阶段所有操作正确按照预期执行后,执行二阶段的操作并进行事务提交
    • 更新委托状态为正常状态
  • cancel阶段(二阶段回滚),一阶段某一操作未按照预期执行且抛出异常,执行二阶段的操作并进行事务回滚
    • 更新委托状态为作废状态

前面说到,获取流水号需要尽可能的做到开销小,所以在日间交易时,通常会批量一次性申请一个号段(比如一次性申请100000-110000,一共一万个序列号),然后在交易系统中缓存这一批次的序列号,每当一个委托请求下达至交易系统,则从缓存中获取当前最新的序列号,同时更新缓存中的最新序列号为下一个可用序列号

引入了缓存,必然需要对缓存的数据进行维护,防止系统挂掉之后缓存中的序列号丢失,那么,通常需要针对维护的序列号设计一个存储最新可用的缓存序列号表。那么,引入了缓存序列号表,那么获取流水号之后就存在双写的义务:即需要保证缓存和数据库表的数据一致性(二者维护的最新可用序列号必须一致。)

金融交易系统中,通常采用先写数据库表,再写缓存的形式保证数据一致性。这通常会牺牲掉一些性能,原因是并发情况下更新同一行需要获取行级锁。保证数据一致性是交易系统的first rule,一旦缓存和数据库的数据不一致,那将引发巨大的问题。

最终,在分布式事务中获取序列号的步骤大体如下:

public Long getSerialCounterNoForUpdate(Integer serialCounterNo) {
        log.info("额度流水号生成阶段:一阶段RPC,对应业务种类:{}。",serialCounterNo);
        Long serialNo;
        int count;
        if (serialNoCacheManager.containsValue(serialCounterNo,currentStartSerialNo) &&
                serialNoCacheManager.getCache(serialCounterNo,currentStartSerialNo) < currentStartSerialNo + COUNTER_NUMBER ){
            //当前序列号缓存命中
            serialNo = serialNoCacheManager.getCache(serialCounterNo, currentStartSerialNo);
            // 取得的流水号放入事务上下文,供后续使用
            BusinessActionContextUtil.addContext("serialNo",serialNo);
            //更新信用流水计数器记录表
            count = creditSerialCounterRecordDomain.updateCreditSerialCounterRecord(serialCounterNo,currentStartSerialNo,
                    currentStartSerialNo + COUNTER_NUMBER - 1L,serialNo,serialNo-1,CommonConstant.INTEGER_ONE);
        } else {
            serialNo = quotaClient.getSerialNo(serialCounterNo);
            // 取得的流水号放入事务上下文,供后续使用
            BusinessActionContextUtil.addContext("serialNo",serialNo);
            //当前可用序列号号段起始值
            BusinessActionContextUtil.addContext("currentStartSerialNo",serialNo);
            // 写入信用流水计数器记录表
            count = creditSerialCounterRecordDomain.insertCreditSerialCounterRecord(serialNo,serialNo+COUNTER_NUMBER-1,
                    serialNo,CommonConstant.INTEGER_ONE,serialCounterNo);

        }
        // 必须更新成功,否则直接抛出异常,当做委托失败处理,进行分布式事务回滚
        if (count != CommonConstant.INTEGER_ONE.intValue()){
            Map<String,Object> parameters = new HashMap<>();
            parameters.put(CommonConstant.SERIAL_COUNTER_NO_KEY,serialCounterNo);
            parameters.put(CommonConstant.SERIAL_NO_KEY,serialNo);
            throw new BusinessExecuteException(ExceptionEnum.ENTRUST_FAILED,parameters);
        }
        return serialNo;
    }

    @Override
    public Boolean generateSerialCounterNo(BusinessActionContext businessActionContext) {
        Integer serialCounterNo = businessActionContext.getActionContext(CommonConstant.SERIAL_COUNTER_NO_KEY, Integer.class);
        Long serialNo = businessActionContext.getActionContext("serialNo", Long.class);
        Long currentStartSerialNo = businessActionContext.getActionContext("currentStartSerialNo", Long.class);
        this.currentStartSerialNo = Optional.ofNullable(currentStartSerialNo).orElse(this.currentStartSerialNo);
        // 二阶段提交时,写入缓存
        serialNoCacheManager.createCache(serialCounterNo,this.currentStartSerialNo,serialNo+1);
        log.info("额度流水号生成阶段:二阶段提交,取得的流水订单号:{},对应业务种类{}。",serialNo,serialCounterNo);
        return Boolean.TRUE;
    }

    @Override
    public Boolean redoSerialCounterNo(BusinessActionContext businessActionContext) {
        Integer serialCounterNo = businessActionContext.getActionContext(CommonConstant.SERIAL_COUNTER_NO_KEY, Integer.class);
        log.info("额度流水号生成阶段:二阶段回滚,对应业务种类{}。",serialCounterNo);
        Long currentStartSerialNo = businessActionContext.getActionContext("currentStartSerialNo", Long.class);
        Long serialNo = businessActionContext.getActionContext("serialNo", Long.class);
        if (currentStartSerialNo == null){
            if (serialNo != null){
                // 未发起RPC申请序列号段,说明当前线程上下文中的序列号未被使用,更新缓存将计数器重置为serialNo即可
                return serialNoCacheManager.createCache(serialCounterNo,this.currentStartSerialNo,serialNo) != null;
            } else {
                return Boolean.TRUE;
            }
        } else {
            // 发起RPC申请了新号段,则需要回冲已申请的号段
            // 1. 发起RPC回冲额度管理系统的序列号段申请记录
            // 2. 本地序列号段表删除新申请的记录
            return quotaClient.redoSerialCounterNo(serialCounterNo) &&
                    creditSerialCounterRecordDomain.deleteCreditSerialCounterRecord(serialCounterNo,currentStartSerialNo,currentStartSerialNo + COUNTER_NUMBER-1,serialNo,CommonConstant.INTEGER_ONE) == CommonConstant.INTEGER_ONE;
        }
    }

大致分为如下步骤:

1、一阶段中,先查询缓存中是否维护最新可用序列号

  • 如果不存在,则发起RPC至计数器功能获取一批次的可用序列号段。 * 获取新的一批序列号后,马上插入表记录,然后更新缓存,确保数据一致性。
  • 如果存在,则直接获取当前可用序列号。 * 获取新的序列号后,马上更新表记录中可用的最新序列号为当前取得的序列号+1。
  • 上述两种情况,任意一次写表失败,说明存在并发,直接抛出异常来实现快速失败机制,确保性能。

2、使用获取的序列号成功后(可能存在RPC新申请和已维护缓存中直接获取两种情况),需要在二阶段提交时将缓存中的可用序列号更新为本次获取的序列号+1,以确保下一次委托命中缓存时可取出直接使用的最新序列号(此时,由于一阶段对于两种情况均已经写表,所以取出的序列号天然递增)。

3、使用获取的序列号失败,即一阶段发生任意报错,则针对两种情况进行回滚:

  • 一阶段缓存中已命中,直接取出序列号的,由于序列号缓存中递增操作在二阶段提交才会去执行,所以一阶段获取的序列号仍能使用,这里缓存可更新可不更新,因为实际上缓存中维护的最新可用序列号并无变化。
  • 一阶段缓存未命中,发起RPC申请新的流水号段,则需要发起一次回冲RPC,将申请的流水号段在计数器功能中进行回冲,避免造成流水号段资源的浪费。