Skip to content

Commit

Permalink
improve KCP performance
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Sep 24, 2023
1 parent e6c89da commit 5603dd7
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 79 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,12 @@ KCP Tube 虽然有“多路复用”的功能,但默认并不主动打开。
为了降低延迟,kcptube 启用了 TCP_NODELAY 选项。对于某些大流量应用场景,可能会造成 TCP 数据传输量减少。
### KCP
kcptube 用的是原版 [KCP](https://github.com/skywind3000/kcp),除了 interval 最小值从 10 改成 1 之外,其它部份未经修改。换句话说,原版的存在“bug”,kcptube 也会有。例如:
kcptube 用的是原版 [KCP](https://github.com/skywind3000/kcp),作了些许修改:
1. 原版的 `flush()` 是先把待发送数据转移到发送队列后,在同一个循环重做完“发送新数据包”、“数据包重发”、“ACK包发送”三件事。修改后的版本变为先做“数据包重发”、“ACK包发送”,然后再做“待发送数据转移到发送队列”,在转移期间顺便发送。
2. 原版的 `check()` 每次都会重新遍历一遍发送队列,查找已到点的重传时间戳。修改后的版本变为直接在KCP结构体中新增 `min_resendts` 变量,该变量在 `flush()` 的发送循环当中顺便找出最小的时间戳,`check()` 就不再需要每次都重新遍历,直接读取 `min_resendts` 变量的值即可。
除此之外,原版的存在“bug”,kcptube 也会有。例如:
* [如何避免缓存积累延迟的问题](https://github.com/skywind3000/kcp/issues/175)
* [求助:一些压测出现的问题, 发大包后不断累积](https://github.com/skywind3000/kcp/issues/243)
Expand Down
7 changes: 6 additions & 1 deletion README_EN.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,12 @@ The timeout period for KCP channel is 30 seconds after enabling the multiplexing
To reduce latency, kcptube has enabled the TCP_NODELAY option. For some high-traffic application scenarios, this may result in a reduction in the amount of TCP data transmitted.
### KCP
KCP Tube uses the original version of the [KCP](https://github.com/skywind3000/kcp), with the exception that the minimum value of the interval has been changed from 10 to 1, and other parts have not been modified. In other words, if there are ‘bugs’ in the original version, they will also exist in kcptube. For example:
KCP Tube uses the original version of the [KCP](https://github.com/skywind3000/kcp), with some modifications:
1. The original `flush()` function first moves the data to be sent to the sending queue, and then completes ‘sending new packet’, ‘packet retransmission’, and ‘ACK packet sending’ in the same loop. In the modified version, ‘packet retransmission’ and ‘ACK packet sending’ are done first, and then ‘move the data to be sent to the sending queue’ is done, sending it during the transfer.
2. The original `check()` function would traverse the sending queue every time to find the retransmission timestamp for packets that have reached their timeout. In the modified version, a new variable `min_resendts` is added to the KCP struct. During the `flush()` sending loop, the minimum timestamp is found and stored in min_resendts. `check()` no longer needs to traverse the queue every time. It can directly read the value of `min_resendts`.
And other ‘bugs’ in the original version, will also exist in kcptube. For example:
* [如何避免缓存积累延迟的问题](https://github.com/skywind3000/kcp/issues/175)
* [求助:一些压测出现的问题, 发大包后不断累积](https://github.com/skywind3000/kcp/issues/243)
Expand Down
87 changes: 50 additions & 37 deletions src/3rd_party/ikcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ ikcpcb* ikcp_create(IUINT32 conv, void *user)
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->min_resendts = 0;
kcp->output = NULL;
kcp->writelog = NULL;

Expand Down Expand Up @@ -947,6 +948,7 @@ void ikcp_flush(ikcpcb *kcp)
int change = 0;
int lost = 0;
IKCPSEG seg;
kcp->min_resendts = 0xFFFFFFFF;

// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;
Expand Down Expand Up @@ -1024,30 +1026,6 @@ void ikcp_flush(ikcpcb *kcp)
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;

newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;

newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}

// calculate resent
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
Expand All @@ -1056,13 +1034,7 @@ void ikcp_flush(ikcpcb *kcp)
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
Expand All @@ -1075,6 +1047,7 @@ void ikcp_flush(ikcpcb *kcp)
}
segment->resendts = current + segment->rto;
lost = 1;
kcp->min_resendts = _imin_(kcp->min_resendts, segment->resendts);
}
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit ||
Expand All @@ -1084,6 +1057,7 @@ void ikcp_flush(ikcpcb *kcp)
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
kcp->min_resendts = _imin_(kcp->min_resendts, segment->resendts);
}
}

Expand Down Expand Up @@ -1114,6 +1088,47 @@ void ikcp_flush(ikcpcb *kcp)
}
}

// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;

newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;

newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current + kcp->rx_rto + rtomin;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 1;

kcp->min_resendts = _imin_(kcp->min_resendts, newseg->resendts);

size = (int)(ptr - buffer);
int need = IKCP_OVERHEAD + newseg->len;

if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}

ptr = ikcp_encode_seg(ptr, newseg);

if (newseg->len > 0) {
memcpy(ptr, newseg->data, newseg->len);
ptr += newseg->len;
}
}

// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
Expand Down Expand Up @@ -1193,7 +1208,6 @@ IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
IINT32 tm_flush = 0x7fffffff;
IINT32 tm_packet = 0x7fffffff;
IUINT32 minimal = 0;
struct IQUEUEHEAD *p;

if (kcp->updated == 0) {
return current;
Expand All @@ -1210,9 +1224,8 @@ IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)

tm_flush = _itimediff(ts_flush, current);

for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
IINT32 diff = _itimediff(seg->resendts, current);
if (!iqueue_is_empty(&kcp->snd_buf)) {
IINT32 diff = _itimediff(kcp->min_resendts, current);
if (diff <= 0) {
return current;
}
Expand Down Expand Up @@ -1245,7 +1258,7 @@ int ikcp_setmtu(ikcpcb *kcp, int mtu)
int ikcp_interval(ikcpcb *kcp, int interval)
{
if (interval > 5000) interval = 5000;
else if (interval < 1) interval = 1;
else if (interval < 10) interval = 10;
kcp->interval = interval;
return 0;
}
Expand All @@ -1263,7 +1276,7 @@ int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
}
if (interval >= 0) {
if (interval > 5000) interval = 5000;
else if (interval < 1) interval = 1;
else if (interval < 10) interval = 10;
kcp->interval = interval;
}
if (resend >= 0) {
Expand Down
1 change: 1 addition & 0 deletions src/3rd_party/ikcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ struct IKCPCB
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 min_resendts;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;
Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
int main(int argc, char *argv[])
{
char app_name[] = "kcptube";
printf("%s version 20230916\n", app_name);
printf("%s version 20230924\n", app_name);

if (argc <= 1)
{
Expand Down
6 changes: 0 additions & 6 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,6 @@ void client_mode::mux_cancel_channel(protocol_type prtcl, kcp_mappings *kcp_mapp
session->when_disconnect(empty_tcp_disconnect);
session->session_is_ending(true);
session->pause(false);
session->disconnect();
session->stop();
}

Expand Down Expand Up @@ -1114,7 +1113,6 @@ void client_mode::process_disconnect(uint32_t conv, tcp_session *session)
session->when_disconnect(empty_tcp_disconnect);
session->session_is_ending(true);
session->pause(false);
session->disconnect();
session->stop();

kcp_channels.erase(kcp_channel_iter);
Expand Down Expand Up @@ -1228,7 +1226,6 @@ void client_mode::delete_mux_records(uint32_t conv)
if (mux_records_ptr->local_tcp != nullptr)
{
mux_records_ptr->local_tcp->when_disconnect(empty_tcp_disconnect);
mux_records_ptr->local_tcp->disconnect();
mux_records_ptr->local_tcp->stop();
mux_records_ptr->local_tcp = nullptr;
}
Expand Down Expand Up @@ -1339,7 +1336,6 @@ void client_mode::cleanup_expiring_handshake_connections()
if (kcp_mappings_ptr->egress_forwarder != nullptr)
{
kcp_mappings_ptr->egress_forwarder->remove_callback();
kcp_mappings_ptr->egress_forwarder->disconnect();
kcp_mappings_ptr->egress_forwarder->stop();
}

Expand Down Expand Up @@ -1426,7 +1422,6 @@ void client_mode::loop_find_expires()
if (expiring_kcp.find(kcp_mappings_ptr) == expiring_kcp.end())
{
tcp_channel->when_disconnect(empty_tcp_disconnect);
tcp_channel->disconnect();
tcp_channel->stop();
expiring_kcp.insert({ kcp_mappings_ptr, time_right_now });
}
Expand Down Expand Up @@ -1888,7 +1883,6 @@ void client_mode::handshake_test_failure(kcp_mappings *handshake_ptr)
void client_mode::handshake_test_cleanup(kcp_mappings *handshake_ptr)
{
handshake_ptr->egress_forwarder->remove_callback();
handshake_ptr->egress_forwarder->disconnect();
handshake_ptr->egress_forwarder->stop();

std::scoped_lock lock_handshake{ mutex_handshakes, mutex_expiring_handshakes };
Expand Down
32 changes: 16 additions & 16 deletions src/networks/connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,22 +483,20 @@ void tcp_session::stop()
{
stopped.store(true);
callback = empty_tcp_callback;
asio::error_code ec;
if (is_open())
connection_socket.close(ec);
disconnect();
}

bool tcp_session::is_pause()
bool tcp_session::is_pause() const
{
return paused.load();
}

bool tcp_session::is_stop()
bool tcp_session::is_stop() const
{
return stopped.load();
}

bool tcp_session::is_open()
bool tcp_session::is_open() const
{
return connection_socket.is_open();
}
Expand All @@ -507,11 +505,13 @@ void tcp_session::disconnect()
{
asio::error_code ec;
connection_socket.shutdown(asio::socket_base::shutdown_both, ec);
ec.clear();
connection_socket.close(ec);
}

void tcp_session::async_read_data()
{
if (paused.load() || stopped.load())
if (paused.load() || stopped.load() || !connection_socket.is_open())
return;

std::unique_ptr<uint8_t[]> buffer_cache = std::make_unique<uint8_t[]>(gbv_buffer_size);
Expand All @@ -535,7 +535,7 @@ size_t tcp_session::send_data(const std::vector<uint8_t> &buffer_data)

size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes)
{
if (stopped.load() || buffer_data == nullptr)
if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr)
return 0;

size_t sent_size = connection_socket.send(asio::buffer(buffer_data, size_in_bytes));
Expand All @@ -545,7 +545,7 @@ size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes)

size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes, asio::error_code &ec)
{
if (stopped.load() || buffer_data == nullptr)
if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr)
return 0;

size_t sent_size = connection_socket.send(asio::buffer(buffer_data, size_in_bytes), 0, ec);
Expand All @@ -555,7 +555,7 @@ size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes,

void tcp_session::async_send_data(std::unique_ptr<std::vector<uint8_t>> data)
{
if (stopped.load() || data == nullptr)
if (stopped.load() || !connection_socket.is_open() || data == nullptr)
return;

auto asio_buffer = asio::buffer(*data);
Expand All @@ -568,7 +568,7 @@ void tcp_session::async_send_data(std::unique_ptr<std::vector<uint8_t>> data)

void tcp_session::async_send_data(std::vector<uint8_t> &&data)
{
if (stopped.load())
if (stopped.load() || !connection_socket.is_open())
return;

auto asio_buffer = asio::buffer(data);
Expand All @@ -579,7 +579,7 @@ void tcp_session::async_send_data(std::vector<uint8_t> &&data)

void tcp_session::async_send_data(std::unique_ptr<uint8_t[]> buffer_data, size_t size_in_bytes)
{
if (stopped.load() || buffer_data == nullptr)
if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr)
return;

auto asio_buffer = asio::buffer(buffer_data.get(), size_in_bytes);
Expand All @@ -590,7 +590,7 @@ void tcp_session::async_send_data(std::unique_ptr<uint8_t[]> buffer_data, size_t

void tcp_session::async_send_data(std::unique_ptr<uint8_t[]> buffer_data, uint8_t *start_pos, size_t size_in_bytes)
{
if (stopped.load() || buffer_data == nullptr || start_pos == nullptr)
if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr || start_pos == nullptr)
return;

asio::async_write(connection_socket, asio::buffer(start_pos, size_in_bytes),
Expand All @@ -600,7 +600,7 @@ void tcp_session::async_send_data(std::unique_ptr<uint8_t[]> buffer_data, uint8_

void tcp_session::async_send_data(const uint8_t *buffer_data, size_t size_in_bytes)
{
if (stopped.load() || buffer_data == nullptr)
if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr)
return;

asio::async_write(connection_socket, asio::buffer(buffer_data, size_in_bytes),
Expand Down Expand Up @@ -909,12 +909,12 @@ void udp_client::stop()
this->disconnect();
}

bool udp_client::is_pause()
bool udp_client::is_pause() const
{
return paused.load();
}

bool udp_client::is_stop()
bool udp_client::is_stop() const
{
return stopped.load();
}
Expand Down
Loading

0 comments on commit 5603dd7

Please sign in to comment.