Java中使用KCP协议
传统游戏项目一般使用TCP协议进行通信,得益于它的稳定和可靠,不过在网络不稳定的情况下,会出现丢包严重。
不过近期有不少基于UDP的应用层协议,声称对UDP的不可靠进行了改造,这意味着我们既可以享受网络层提供稳定可靠的服务,又可以享受它的速度。
KCP就是这样的一个协议
不过网上说的再天花乱坠,我们也得亲自调研,分析源码和它的机制,并测试它的性能,是否满足项目上线要求。本文从C版本的源码入手理解KCP的机制,再研究各种JAVA版本的实现
一、KCP协议
原版源码(C代码):
https://github.com/skywind3000/kcp
基于底层协议(一般是UDP)之上,完全在应用层实现类TCP的可靠机制(快速重传,拥塞控制等)
二、KCP特性
KCP实现以下特性,也可参考github中README中对KCP的定义
特性 |
说明 |
源码位置 |
RTO优化 |
超时时间计算优于TCP |
ikcp_update_ack |
选择性重传 |
KCP只重传真正丢失的数据包,TCP会全部重传丢失包之后的全部数据 |
ikcp_parse_fastack,ikcp_flush |
快速重传 |
根据配置,可以在丢失包被跳过一定次数后直接重传,不等RTO超时 |
ikcp_parse_fastack,ikcp_flush |
UNA + ACK |
ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP),ACK(该编号包已收到),光用UNA将导致全部重传,光用ACK则丢失成本太高,以往协议都是二选其一,而 KCP协议中,除去单独的 ACK包外,所有包都有UNA信息。 |
ikcp_flush(每次update,都发送ACK) |
非延迟ACK |
KCP可配置是否延迟发送ACK |
ikcp_update_ack |
流量控制 |
同TCP的公平退让原则,发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定 |
ikcp_input, |
ikcp_flush |
|
|
三、KCP报文
1. 报文解析源码
源码中对报文解析部分代码如下
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
2. 报文定义
报文中标识的定义
名词 |
全称 |
备注 |
作用 |
conv |
conversation id |
会话ID |
每个连接的唯一标识 |
cmd |
command |
命令 |
每个数据包指定逻辑 |
frg |
fragment count |
数据分段序号 |
根据mtu(最大传输单元)和mss(最大报文长度)的数据分段 |
wnd |
window size |
接收窗口大小 |
流量控制 |
ts |
timestamp |
时间戳 |
数据包发送时间记录 |
sn |
serial number |
数据报的序号 |
确保包的有序 |
una |
un-acknowledged serial number |
对端下一个要接收的数据报序号 |
确保包的有序 |
3. 消息类型
KCP报文的四种消息类型
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data: 推送数据
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack: 对推送数据的确认
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask): 询问窗口大小
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell): 回复窗口大小
- 报文结构
四、源码解析
在网络四层模型中,KCP和TCP/UDP(传输层),IP(网络层)等协议有着本质上区别,理论上KCP是属于应用层协议。
KCP并不提供协议实际收发处理,它只是在传输层只上对消息和链接的一层中间管理。
在KCP的源码中,它仅仅包含ikcp.c和ikcp.h两个文件,仅提供KCP的数据管理和数据接口,而用户需要在应用层进行KCP的调度
1. 结构体定义
KCP分包结构KCP对象结构体定义
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //用来标记这个seg属于哪个kcp
IUINT32 cmd;//这个包的指令是: // 数据 ack 询问/应答窗口大小
IUINT32 frg; //分包时,分包的序号,0为终结
IUINT32 wnd;//发送这个seg的这个端的 窗口大小--> 远端的接收窗口大小
IUINT32 ts; //我不知道为什么要用时间轴,这个都1秒,有什么用 ??
IUINT32 sn;//相当于tcp的ack
IUINT32 una;//una 远端等待接收的一个序号
IUINT32 len; //data的长度
IUINT32 resendts;//重发的时间轴
IUINT32 rto;//等于发送端kcp的 rx_rto->由 计算得来
IUINT32 fastack;//ack跳过的次数,用于快速重传
IUINT32 xmit;// fastack resend次数
char data[1];//当malloc时,只需要 malloc(sizeof(IKCPSEG)+datalen) 则,data长=数据长度+1 刚好用来放0
};
struct IKCPCB
{
//会话ID,最大传输单元,最大分片大小,状态 mss=mtu-sizeof(IKCPSEG)
IUINT32 conv, mtu, mss, state;
//第一个未接收到的包,待发送的包(可以认为是tcp的ack自增),接收消息的序号-> 用来赋seg的una值
IUINT32 snd_una, snd_nxt, rcv_nxt;
//前两个不知道干嘛 拥塞窗口的阈值 用来控制cwnd值变化的
IUINT32 ts_recent, ts_lastack, ssthresh;
//这几个变量是用来更新rto的
// rx_rttval 接收ack的浮动值
// rx_srtt 接收ack的平滑值
// rx_rto 计算出来的rto
// rx_minrto 最小rto
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
//发送队列的窗口大小
//接收队列的窗口大小
//远端的接收队列的窗口大小
//窗口大小
//probe 用来二进制标记
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
//时间轴 时间间隔 下一次flush的时间 xmit发射多少次? 看不到有什么地方用到
IUINT32 current, interval, ts_flush, xmit;
//接收到的数据seg个数
//需要发送的seg个数
IUINT32 nrcv_buf, nsnd_buf;
//接收队列的数据 seg个数
//发送队列的数据 seg个数
IUINT32 nrcv_que, nsnd_que;
//是否为nodelay模式:如果开启,rto计算范围更小
//updated 在调用flush时,有没有调用过update
IUINT32 nodelay, updated;
//请求访问窗口的时间相关 当远程端口大小为0时
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
//发送队列
struct IQUEUEHEAD snd_queue;
//接收队列
struct IQUEUEHEAD rcv_queue;
//待发送队列
struct IQUEUEHEAD snd_buf;
//待接收队列
struct IQUEUEHEAD rcv_buf;
//用来缓存自己接收到了多少个ack
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
//用户信息
void *user;
//好像就用来操作数据的中转站
char *buffer;
//快速重传的阈值
int fastresend;
//快速重传的上限
int fastlimit;
//是否无视重传等其它设置窗口
//steam模式的话,会将几个小包合并成大包
int nocwnd, stream;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
2. 接口分析
分析C源码,KCP作为中间管理层,主要提供以下接口
//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------
// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
// 创建kcp对象,conv必须在两个端之间相同,user会被传递到output回调,
// output回调这样设置:kcp->output = my_udp_output
ikcpcb* ikcp_create(IUINT32 conv, void *user);
// release kcp control object
// 释放kcp对象
void ikcp_release(ikcpcb *kcp);
// set output callback, which will be invoked by kcp
// 设置kcp调用的output回调
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));
// user/upper level recv: returns size, returns below zero for EAGAIN
// 用户层/上层 接收消息:返回接收长度,数据读取错误返回值小于0
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
// user/upper level send, returns below zero for error
// 用户层/上层 发送消息,错误返回值小于0
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
// 更新状态(每10ms-100ms调用一次),或者你可以通过调用ikcp_check,
// 来得知什么时候再次调用(不调用ikcp_input/_send)
// current - 当前时间戳(毫秒)
void ikcp_update(ikcpcb *kcp, IUINT32 current);
// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
// 决定你什么时候调用ikcp_update
// 返回你多少毫秒后应该调用ikcp_update,如果没有ikcp_input/_send调用,你可以在那个时间
// 调用ikcp_updates来代替自己驱动update调用
// 用于减少不必要的ikcp_update调用。用这个来驱动ikcp_update(比如:实现类epoll的机制,
// 或者优化处理大量kcp连接时的ikcp_update调用)
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);
// when you received a low level packet (eg. UDP packet), call it
// 接收下层数据包(比如:UDP数据包)时调用
int ikcp_input(ikcpcb *kcp, const char *data, long size);
// flush pending data
// 刷新数据
void ikcp_flush(ikcpcb *kcp);
// check the size of next message in the recv queue
// 检测接收队列里下条消息的长度
int ikcp_peeksize(const ikcpcb *kcp);
// change MTU size, default is 1400
// 修改MTU长度,默认1400
int ikcp_setmtu(ikcpcb *kcp, int mtu);
// set maximum window size: sndwnd=32, rcvwnd=32 by default
// 设置最大窗口大小,默认值:sndwnd=32, rcvwnd=32
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
// get how many packet is waiting to be sent
// 获取准备发送的数据包
int ikcp_waitsnd(const ikcpcb *kcp);
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
// 快速设置:ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay:0:使用(默认),1:使用
// interval:update时间(毫秒),默认100ms
// resend:0:不适用快速重发(默认), 其他:自己设置值,若设置为2(则2次ACK跨越将会直接重传)
// nc:0:正常拥塞控制(默认), 1:不适用拥塞控制
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);
// setup allocator
// 设置kcp allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));
// read conv
// 获取conv
IUINT32 ikcp_getconv(const void *ptr);
3. 调度逻辑
KCP关键接口:
- 更新(上层驱动KCP状态更新)
ikcp_update:kcp状态更新接口,需要上层进行调度,判断flush时间,满足条件调用ikcp_flush刷新数据,同时也负责对收到数据的kcp端回复ACK消息 - 发送
ikcp_send -> ikcp_update -> ikcp_output ikcp_send:上层调用发送接口,把数据根据mss值进行分片,设置分包编号,放到snd_queue队尾 ikcp_flush:发送数据接口,根据对端窗口大小,拷贝snd_queue的数据到snd_buf,遍历snd_buf,满足条件则调用output回调(调用网络层的发送) - 接收
ikcp_input -> ikcp_update -> ikcp_recv ikcp_input:解析上层输入数据,拷贝rcv_buf到rcv_queue ikcp_recv:数据接收接口,上层从rcv_queue中复制数据到网络层buffer
五、Java版本
目前github上有几个高star的java版本实现,选取最高的三个进行分析
1.https://github.com/szhnet/kcp-netty.git(star:212)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.UkcpServerChannel继承ServerChannel,UkcpServerBootStrap
3.用Boss线程 EventLoopGroup的read事件来驱动KCP逻辑
优点:使用Netty的Boss线程Read事件来驱动KCP,不用while(true)的驱动;使用简单,只需使用指定的ServerChannel和ServerBootStrap来启动Netty
缺点:无明显缺点
2.https://github.com/beykery/jkcp.git(star:172)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.启动指定线程数的KcpThread自定义IO线程池,进行KCP逻辑调度
3.Netty读消息时抛到KcpThread自定义IO线程
// 通过hash选择IO线程处理
InetSocketAddress sender = dp.sender();
int hash = sender.hashCode();
hash = hash < 0 ? -hash : hash;
this.workers[hash % workers.length].input(dp);
优点:代码简单明了,容易理解,核心是翻译版源码,外壳套的是Netty+自定义IO线程池
缺点:IO线程池会while(true)的调用KCP的update
3.https://github.com/l42111996/java-Kcp.git(star:187)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.Netty读消息时,扔到定时器,1ms后,抛出任务到自定义IO线程
优点:拥有1的全部优点,也在Netty的读消息,把消息抛到定时器去调用KCP的逻辑,避免了2的无意义的while(true),同时实现功能更全,有上线项目验证(据作者描述)
缺点:Netty相关逻辑完全封装起来,不能修改任何Netty参数(不过源码中对Netty的参数已配置的很好了)
目前看来,第三种实现(
https://github.com/l42111996/java-Kcp.git)是最理想的方式
如果大家感兴趣,后边会对第三种实现进行详细的源码分析