4分钟
交易系统系统间数据交互格式设计
背景
交易系统的特点,除了高吞吐量、高并发之外,往往对延迟有着非常敏感的要求,业界话术:高抖动敏感。
但是时延并不是针对所有业务,比如交易系统涉及的大宗交易、综合业务、盘后定价等业务,由于发生频次非常的低,一般一个交易日才几十笔,所以这类业务并没有很敏感的时延需求。
时延需求仅仅存在于日间竞价撮合场景,比如普通交易、融资融券等发生频次非常频繁的业务。
时延的一个重要影响因素,就是系统间的数据交互,一般称之为 RPC,一笔交易委托的下达,往往伴随着生成订单、申报至交易所、交易所撮合后进行回报等操作。
而在委托下达的过程中,往往又会涉及多个子系统的交互,比如融资融券业务,需要在委托前 RPC 至合约系统生成一笔新的合约,然后需要 RPC 至额度系统扣减券商授权额度等一系列操作,如何用简洁的消息体去传达一笔委托中的所有跨系统指令,是一个低时延交易系统需要考虑的一个必要大前提。
消息体的设计
传统的 json 格式,显然不满足这种低时延的要求,原因是 json 的序列化和反序列化操作存在较高的耗时,同时 json 格式虽然可读性很高,但排列不够紧凑,本身占据的空间往往较大,在低延时的场景会产生较大的带宽占用,一旦因为带宽不足(虽然可能性很小)而发生网络抖动,那对于日间交易将是灾难性的影响,会导致系统时延增大、处理速度过慢导致订单积压,系统tps 上不去。
因此,需要用一种更简洁、更紧凑的方式去设计交易系统敏感场景的数据传输格式。
一般采用直接拼接字节的方式进行传输,这种方式对消息体设计人员的要求较高,但好处就是能够保证高效的系统间传输,但坏处也存在,那就是必须严格约定每一个字节代表的意义,并且上下游各系统必须严格遵守这个约定。某种意义上,牺牲了 json 带来的可读性和便捷性,保证了系统的时延和高吞吐量。
一般用域的概念来表示字节组成的消息的各个子部分,可参考如下设计:
-
1 号域,只有一个字节,代表了本次传输消息的所有参数个数,由于是一个字节,所以一次 RPC 最多支持 256 个参数。
-
2 号域,如果 1 号域得出消息中一共有 n 个参数,0< n <=256,那么 2 号域的下标范围为:1 至 n ,也就是从第二个字节开始,往后的 n 个字节(n 由 1 号域得出)的范围里,每一个字节代表了一种第 n 个参数的类型,注意,必须严格约定每种类型代表的字节值,比如下面的表格,就可以作为约定基本数据类型对应的字节值的规范:
字节值 基本类型 98 byte 115 short 99 char 102 float 100 double 105 int 108 long 66 boolean 83 string -
3 号域,如果 1 号域得出消息中一共有 n 个参数,那么 3 号域的范围为 n+1 至 n+1+2*n - 1,从 n+1个字节处往后,每 2 个字节代表了第 n 个参数所占用的字节个数,之所以设计了两个字节来表示字节个数,是为了考虑字符串的长度是可变的,它不像基本类型(比如 int 固定用 4 个字节即可表示)。从 3 号域可以获得每一个参数的占用字节数,为 4 号域获取值做准备。
-
4 号域,如果 1 号域得出消息中一共有 n 个参数,从 3 号域推断出每个参数的字节数(假设 n 个参数总共占用m 个字节),那么 4 号域的范围为 n+1+2*n 至 n+1+2*n+m,需要维护一个 valueIndex 下标,当第 n 个参数对应的 3 号域中占用字节数为 k 时,valueIndex 至 valueIndex + k -1 范围内的字节即当前参数对应的值。
-
5 号域,组成较为简单,通过参数名字符串直接拼接|字符构成,比如:orderId|businessAmount|reportDate,可直接一次性读取完毕后通过|字符分割,获取 n 个参数对应的参数名。
代码示例
可以指定传入一个 map,来序列化成字节消息:
/**
* @Author huang.zh
* @Description 将入参的个数填充成字节数组
* @Date 7:30 PM 2024/10/2
* @Param [bytes, parameters]
* @return
**/
public static byte[] generateParameterLength(Map<String,Object> parameters){
// 计算长度
// 参数个数 1 个字节:parameterNumber
// 参数类型 parameterNumber 个字节
int parameterNumber = parameters.size();
int length = 1 + parameterNumber + 2 * parameterNumber;
Collection<Object> values = parameters.values();
for (Object value : values) {
if (value instanceof Integer || value instanceof Float ) {
length += 4;
} else if (value instanceof Long || value instanceof Double){
length += 8;
} else if (value instanceof Character || value instanceof Byte || value instanceof Boolean){
length += 1;
} else if (value instanceof Short){
length += 2;
}
}
byte[] bytes = new byte[length];
bytes[0] = (byte) parameterNumber;
int typeIndex = 1,lengthIndex = 1 + parameterNumber + 1,valueIndex = 1 + parameterNumber + 2 * parameterNumber;
for (Object value : values) {
if (value instanceof Integer) {
bytes[typeIndex] = Type.INTEGER.type();
bytes[lengthIndex] = Integer.valueOf(4).byteValue();
lengthIndex += 2;
Unpooled.copyInt((Integer) value).readBytes(bytes,valueIndex,4);
valueIndex += 4;
} else if (value instanceof Long){
bytes[typeIndex] = Type.LONG.type();
bytes[lengthIndex] = Integer.valueOf(8).byteValue();
lengthIndex += 2;
Unpooled.copyLong((Long) value).readBytes(bytes,valueIndex,8);
valueIndex += 8;
} else if(value instanceof Double){
bytes[typeIndex] = Type.DOUBLE.type();
bytes[lengthIndex] = Integer.valueOf(8).byteValue();
lengthIndex += 2;
Unpooled.copyDouble((Double) value).readBytes(bytes,valueIndex,8);
valueIndex += 8;
} else if (value instanceof Character){
bytes[typeIndex] = Type.CHAR.type();
bytes[lengthIndex] = Integer.valueOf(1).byteValue();
lengthIndex += 2;
bytes[valueIndex] = (byte) value;
valueIndex += 1;
} else if(value instanceof Byte) {
bytes[typeIndex] = Type.BYTE.type();
bytes[lengthIndex] = Integer.valueOf(1).byteValue();
lengthIndex += 2;
bytes[valueIndex] = (byte) value;
valueIndex += 1;
} else if (value instanceof Short){
bytes[typeIndex] = Type.SHORT.type();
bytes[lengthIndex] = Integer.valueOf(2).byteValue();
lengthIndex += 2;
Unpooled.copyShort((Short) value).readBytes(bytes,valueIndex,2);
valueIndex += 2;
} else if (value instanceof Float){
bytes[typeIndex] = Type.FLOAT.type();
bytes[lengthIndex] = Integer.valueOf(4).byteValue();
lengthIndex += 2;
Unpooled.copyFloat((Float) value).readBytes(bytes,valueIndex,4);
valueIndex += 4;
} else if (value instanceof Boolean){
bytes[typeIndex] = Type.BOOLEAN.type();
bytes[lengthIndex] = Integer.valueOf(1).byteValue();
lengthIndex += 2;
Unpooled.copyBoolean((Boolean) value).readBytes(bytes,valueIndex,1);
valueIndex += 1;
} else if (value instanceof String){
// 隐藏 String类型参数的实现,有兴趣的同学可以自己实现一下,比较简单~
}
typeIndex ++;
}
return bytes;
}