背景

交易系统的特点,除了高吞吐量、高并发之外,往往对延迟有着非常敏感的要求,业界话术:高抖动敏感。

但是时延并不是针对所有业务,比如交易系统涉及的大宗交易、综合业务、盘后定价等业务,由于发生频次非常的低,一般一个交易日才几十笔,所以这类业务并没有很敏感的时延需求。

时延需求仅仅存在于日间竞价撮合场景,比如普通交易、融资融券等发生频次非常频繁的业务。

时延的一个重要影响因素,就是系统间的数据交互,一般称之为 RPC,一笔交易委托的下达,往往伴随着生成订单、申报至交易所、交易所撮合后进行回报等操作。

而在委托下达的过程中,往往又会涉及多个子系统的交互,比如融资融券业务,需要在委托前 RPC 至合约系统生成一笔新的合约,然后需要 RPC 至额度系统扣减券商授权额度等一系列操作,如何用简洁的消息体去传达一笔委托中的所有跨系统指令,是一个低时延交易系统需要考虑的一个必要大前提。

消息体的设计

传统的 json 格式,显然不满足这种低时延的要求,原因是 json 的序列化和反序列化操作存在较高的耗时,同时 json 格式虽然可读性很高,但排列不够紧凑,本身占据的空间往往较大,在低延时的场景会产生较大的带宽占用,一旦因为带宽不足(虽然可能性很小)而发生网络抖动,那对于日间交易将是灾难性的影响,会导致系统时延增大、处理速度过慢导致订单积压,系统tps 上不去。

因此,需要用一种更简洁、更紧凑的方式去设计交易系统敏感场景的数据传输格式。

一般采用直接拼接字节的方式进行传输,这种方式对消息体设计人员的要求较高,但好处就是能够保证高效的系统间传输,但坏处也存在,那就是必须严格约定每一个字节代表的意义,并且上下游各系统必须严格遵守这个约定。某种意义上,牺牲了 json 带来的可读性和便捷性,保证了系统的时延和高吞吐量。

一般用域的概念来表示字节组成的消息的各个子部分,可参考如下设计:

  • 1 号域,只有一个字节,代表了本次传输消息的所有参数个数,由于是一个字节,所以一次 RPC 最多支持 256 个参数。

  • 2 号域,如果 1 号域得出消息中一共有 n 个参数,0< n <=256,那么 2 号域的下标范围为:1 至 n ,也就是从第二个字节开始,往后的 n 个字节(n 由 1 号域得出)的范围里,每一个字节代表了一种第 n 个参数的类型,注意,必须严格约定每种类型代表的字节值,比如下面的表格,就可以作为约定基本数据类型对应的字节值的规范:

    字节值 基本类型
    98 byte
    115 short
    99 char
    102 float
    100 double
    105 int
    108 long
    66 boolean
    83 string
  • 3 号域,如果 1 号域得出消息中一共有 n 个参数,那么 3 号域的范围为 n+1 至 n+1+2*n - 1,从 n+1个字节处往后,每 2 个字节代表了第 n 个参数所占用的字节个数,之所以设计了两个字节来表示字节个数,是为了考虑字符串的长度是可变的,它不像基本类型(比如 int 固定用 4 个字节即可表示)。从 3 号域可以获得每一个参数的占用字节数,为 4 号域获取值做准备。

  • 4 号域,如果 1 号域得出消息中一共有 n 个参数,从 3 号域推断出每个参数的字节数(假设 n 个参数总共占用m 个字节),那么 4 号域的范围为 n+1+2*n 至 n+1+2*n+m,需要维护一个 valueIndex 下标,当第 n 个参数对应的 3 号域中占用字节数为 k 时,valueIndex 至 valueIndex + k -1 范围内的字节即当前参数对应的值。

  • 5 号域,组成较为简单,通过参数名字符串直接拼接|字符构成,比如:orderId|businessAmount|reportDate,可直接一次性读取完毕后通过|字符分割,获取 n 个参数对应的参数名。

代码示例

可以指定传入一个 map,来序列化成字节消息:

		/**
     * @Author huang.zh
     * @Description 将入参的个数填充成字节数组
     * @Date 7:30 PM 2024/10/2
     * @Param [bytes, parameters]
     * @return
     **/
		public static byte[] generateParameterLength(Map<String,Object> parameters){
        // 计算长度
        // 参数个数 1 个字节:parameterNumber
        // 参数类型 parameterNumber 个字节
        int parameterNumber = parameters.size();
        int length = 1 + parameterNumber + 2 * parameterNumber;
        Collection<Object> values = parameters.values();
        for (Object value : values) {
            if (value instanceof Integer || value instanceof Float ) {
                length += 4;
            } else if (value instanceof Long || value instanceof Double){
                length += 8;
            } else if (value instanceof Character || value instanceof Byte || value instanceof Boolean){
                length += 1;
            } else if (value instanceof Short){
                length += 2;
            }
        }
        byte[] bytes = new byte[length];
        bytes[0] = (byte) parameterNumber;
        int typeIndex = 1,lengthIndex = 1 + parameterNumber + 1,valueIndex = 1 + parameterNumber + 2 * parameterNumber;
        for (Object value : values) {
            if (value instanceof Integer) {
                bytes[typeIndex] = Type.INTEGER.type();
                bytes[lengthIndex] = Integer.valueOf(4).byteValue();
                lengthIndex += 2;
                Unpooled.copyInt((Integer) value).readBytes(bytes,valueIndex,4);
                valueIndex += 4;
            } else if (value instanceof Long){
                bytes[typeIndex] = Type.LONG.type();
                bytes[lengthIndex] = Integer.valueOf(8).byteValue();
                lengthIndex += 2;
                Unpooled.copyLong((Long) value).readBytes(bytes,valueIndex,8);
                valueIndex += 8;
            } else if(value instanceof Double){
                bytes[typeIndex] = Type.DOUBLE.type();
                bytes[lengthIndex] = Integer.valueOf(8).byteValue();
                lengthIndex += 2;
                Unpooled.copyDouble((Double) value).readBytes(bytes,valueIndex,8);
                valueIndex += 8;
            } else if (value instanceof Character){
                bytes[typeIndex] = Type.CHAR.type();
                bytes[lengthIndex] = Integer.valueOf(1).byteValue();
                lengthIndex += 2;
                bytes[valueIndex] = (byte) value;
                valueIndex += 1;
            } else if(value instanceof Byte) {
                bytes[typeIndex] = Type.BYTE.type();
                bytes[lengthIndex] = Integer.valueOf(1).byteValue();
                lengthIndex += 2;
                bytes[valueIndex] = (byte) value;
                valueIndex += 1;
            } else if (value instanceof Short){
                bytes[typeIndex] = Type.SHORT.type();
                bytes[lengthIndex] = Integer.valueOf(2).byteValue();
                lengthIndex += 2;
                Unpooled.copyShort((Short) value).readBytes(bytes,valueIndex,2);
                valueIndex += 2;
            } else if (value instanceof Float){
                bytes[typeIndex] = Type.FLOAT.type();
                bytes[lengthIndex] = Integer.valueOf(4).byteValue();
                lengthIndex += 2;
                Unpooled.copyFloat((Float) value).readBytes(bytes,valueIndex,4);
                valueIndex += 4;
            } else if (value instanceof Boolean){
                bytes[typeIndex] = Type.BOOLEAN.type();
                bytes[lengthIndex] = Integer.valueOf(1).byteValue();
                lengthIndex += 2;
                Unpooled.copyBoolean((Boolean) value).readBytes(bytes,valueIndex,1);
                valueIndex += 1;
            } else if (value instanceof String){
								// 隐藏 String类型参数的实现,有兴趣的同学可以自己实现一下,比较简单~
            }
            typeIndex ++;
        }
        return bytes;
    }