From 5603dd7c91cf9d4149dac977dd6ed43cd7d80431 Mon Sep 17 00:00:00 2001 From: cnbatch Date: Sun, 24 Sep 2023 15:41:36 +0800 Subject: [PATCH] improve KCP performance --- README.md | 7 ++- README_EN.md | 7 ++- src/3rd_party/ikcp.c | 87 +++++++++++++++++++++--------------- src/3rd_party/ikcp.h | 1 + src/main.cpp | 2 +- src/networks/client.cpp | 6 --- src/networks/connections.cpp | 32 ++++++------- src/networks/connections.hpp | 10 ++--- src/networks/kcp.cpp | 4 +- src/networks/server.cpp | 11 ----- 10 files changed, 88 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 10ca70d..2f014df 100644 --- a/README.md +++ b/README.md @@ -450,7 +450,12 @@ KCP Tube 虽然有“多路复用”的功能,但默认并不主动打开。 为了降低延迟,kcptube 启用了 TCP_NODELAY 选项。对于某些大流量应用场景,可能会造成 TCP 数据传输量减少。 ### KCP -kcptube 用的是原版 [KCP](https://github.com/skywind3000/kcp),除了 interval 最小值从 10 改成 1 之外,其它部份未经修改。换句话说,原版的存在“bug”,kcptube 也会有。例如: +kcptube 用的是原版 [KCP](https://github.com/skywind3000/kcp),作了些许修改: + +1. 原版的 `flush()` 是先把待发送数据转移到发送队列后,在同一个循环重做完“发送新数据包”、“数据包重发”、“ACK包发送”三件事。修改后的版本变为先做“数据包重发”、“ACK包发送”,然后再做“待发送数据转移到发送队列”,在转移期间顺便发送。 +2. 原版的 `check()` 每次都会重新遍历一遍发送队列,查找已到点的重传时间戳。修改后的版本变为直接在KCP结构体中新增 `min_resendts` 变量,该变量在 `flush()` 的发送循环当中顺便找出最小的时间戳,`check()` 就不再需要每次都重新遍历,直接读取 `min_resendts` 变量的值即可。 + +除此之外,原版的存在“bug”,kcptube 也会有。例如: * [如何避免缓存积累延迟的问题](https://github.com/skywind3000/kcp/issues/175) * [求助:一些压测出现的问题, 发大包后不断累积](https://github.com/skywind3000/kcp/issues/243) diff --git a/README_EN.md b/README_EN.md index 964b42a..d0987ad 100644 --- a/README_EN.md +++ b/README_EN.md @@ -452,7 +452,12 @@ The timeout period for KCP channel is 30 seconds after enabling the multiplexing To reduce latency, kcptube has enabled the TCP_NODELAY option. For some high-traffic application scenarios, this may result in a reduction in the amount of TCP data transmitted. ### KCP -KCP Tube uses the original version of the [KCP](https://github.com/skywind3000/kcp), with the exception that the minimum value of the interval has been changed from 10 to 1, and other parts have not been modified. In other words, if there are ‘bugs’ in the original version, they will also exist in kcptube. For example: +KCP Tube uses the original version of the [KCP](https://github.com/skywind3000/kcp), with some modifications: + +1. The original `flush()` function first moves the data to be sent to the sending queue, and then completes ‘sending new packet’, ‘packet retransmission’, and ‘ACK packet sending’ in the same loop. In the modified version, ‘packet retransmission’ and ‘ACK packet sending’ are done first, and then ‘move the data to be sent to the sending queue’ is done, sending it during the transfer. +2. The original `check()` function would traverse the sending queue every time to find the retransmission timestamp for packets that have reached their timeout. In the modified version, a new variable `min_resendts` is added to the KCP struct. During the `flush()` sending loop, the minimum timestamp is found and stored in min_resendts. `check()` no longer needs to traverse the queue every time. It can directly read the value of `min_resendts`. + +And other ‘bugs’ in the original version, will also exist in kcptube. For example: * [如何避免缓存积累延迟的问题](https://github.com/skywind3000/kcp/issues/175) * [求助:一些压测出现的问题, 发大包后不断累积](https://github.com/skywind3000/kcp/issues/243) diff --git a/src/3rd_party/ikcp.c b/src/3rd_party/ikcp.c index 3a01c5a..e5ce627 100644 --- a/src/3rd_party/ikcp.c +++ b/src/3rd_party/ikcp.c @@ -288,6 +288,7 @@ ikcpcb* ikcp_create(IUINT32 conv, void *user) kcp->nocwnd = 0; kcp->xmit = 0; kcp->dead_link = IKCP_DEADLINK; + kcp->min_resendts = 0; kcp->output = NULL; kcp->writelog = NULL; @@ -947,6 +948,7 @@ void ikcp_flush(ikcpcb *kcp) int change = 0; int lost = 0; IKCPSEG seg; + kcp->min_resendts = 0xFFFFFFFF; // 'ikcp_update' haven't been called. if (kcp->updated == 0) return; @@ -1024,30 +1026,6 @@ void ikcp_flush(ikcpcb *kcp) cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd); - // move data from snd_queue to snd_buf - while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { - IKCPSEG *newseg; - if (iqueue_is_empty(&kcp->snd_queue)) break; - - newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); - - iqueue_del(&newseg->node); - iqueue_add_tail(&newseg->node, &kcp->snd_buf); - kcp->nsnd_que--; - kcp->nsnd_buf++; - - newseg->conv = kcp->conv; - newseg->cmd = IKCP_CMD_PUSH; - newseg->wnd = seg.wnd; - newseg->ts = current; - newseg->sn = kcp->snd_nxt++; - newseg->una = kcp->rcv_nxt; - newseg->resendts = current; - newseg->rto = kcp->rx_rto; - newseg->fastack = 0; - newseg->xmit = 0; - } - // calculate resent resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0; @@ -1056,13 +1034,7 @@ void ikcp_flush(ikcpcb *kcp) for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); int needsend = 0; - if (segment->xmit == 0) { - needsend = 1; - segment->xmit++; - segment->rto = kcp->rx_rto; - segment->resendts = current + segment->rto + rtomin; - } - else if (_itimediff(current, segment->resendts) >= 0) { + if (_itimediff(current, segment->resendts) >= 0) { needsend = 1; segment->xmit++; kcp->xmit++; @@ -1075,6 +1047,7 @@ void ikcp_flush(ikcpcb *kcp) } segment->resendts = current + segment->rto; lost = 1; + kcp->min_resendts = _imin_(kcp->min_resendts, segment->resendts); } else if (segment->fastack >= resent) { if ((int)segment->xmit <= kcp->fastlimit || @@ -1084,6 +1057,7 @@ void ikcp_flush(ikcpcb *kcp) segment->fastack = 0; segment->resendts = current + segment->rto; change++; + kcp->min_resendts = _imin_(kcp->min_resendts, segment->resendts); } } @@ -1114,6 +1088,47 @@ void ikcp_flush(ikcpcb *kcp) } } + // move data from snd_queue to snd_buf + while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { + IKCPSEG *newseg; + if (iqueue_is_empty(&kcp->snd_queue)) break; + + newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); + + iqueue_del(&newseg->node); + iqueue_add_tail(&newseg->node, &kcp->snd_buf); + kcp->nsnd_que--; + kcp->nsnd_buf++; + + newseg->conv = kcp->conv; + newseg->cmd = IKCP_CMD_PUSH; + newseg->wnd = seg.wnd; + newseg->ts = current; + newseg->sn = kcp->snd_nxt++; + newseg->una = kcp->rcv_nxt; + newseg->resendts = current + kcp->rx_rto + rtomin; + newseg->rto = kcp->rx_rto; + newseg->fastack = 0; + newseg->xmit = 1; + + kcp->min_resendts = _imin_(kcp->min_resendts, newseg->resendts); + + size = (int)(ptr - buffer); + int need = IKCP_OVERHEAD + newseg->len; + + if (size + need > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + + ptr = ikcp_encode_seg(ptr, newseg); + + if (newseg->len > 0) { + memcpy(ptr, newseg->data, newseg->len); + ptr += newseg->len; + } + } + // flash remain segments size = (int)(ptr - buffer); if (size > 0) { @@ -1193,7 +1208,6 @@ IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current) IINT32 tm_flush = 0x7fffffff; IINT32 tm_packet = 0x7fffffff; IUINT32 minimal = 0; - struct IQUEUEHEAD *p; if (kcp->updated == 0) { return current; @@ -1210,9 +1224,8 @@ IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current) tm_flush = _itimediff(ts_flush, current); - for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { - const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node); - IINT32 diff = _itimediff(seg->resendts, current); + if (!iqueue_is_empty(&kcp->snd_buf)) { + IINT32 diff = _itimediff(kcp->min_resendts, current); if (diff <= 0) { return current; } @@ -1245,7 +1258,7 @@ int ikcp_setmtu(ikcpcb *kcp, int mtu) int ikcp_interval(ikcpcb *kcp, int interval) { if (interval > 5000) interval = 5000; - else if (interval < 1) interval = 1; + else if (interval < 10) interval = 10; kcp->interval = interval; return 0; } @@ -1263,7 +1276,7 @@ int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) } if (interval >= 0) { if (interval > 5000) interval = 5000; - else if (interval < 1) interval = 1; + else if (interval < 10) interval = 10; kcp->interval = interval; } if (resend >= 0) { diff --git a/src/3rd_party/ikcp.h b/src/3rd_party/ikcp.h index b8c337f..2822df7 100644 --- a/src/3rd_party/ikcp.h +++ b/src/3rd_party/ikcp.h @@ -294,6 +294,7 @@ struct IKCPCB IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; IUINT32 current, interval, ts_flush, xmit; + IUINT32 min_resendts; IUINT32 nrcv_buf, nsnd_buf; IUINT32 nrcv_que, nsnd_que; IUINT32 nodelay, updated; diff --git a/src/main.cpp b/src/main.cpp index 22a2acf..ac5adc0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -19,7 +19,7 @@ int main(int argc, char *argv[]) { char app_name[] = "kcptube"; - printf("%s version 20230916\n", app_name); + printf("%s version 20230924\n", app_name); if (argc <= 1) { diff --git a/src/networks/client.cpp b/src/networks/client.cpp index 44b8986..96b6e20 100644 --- a/src/networks/client.cpp +++ b/src/networks/client.cpp @@ -771,7 +771,6 @@ void client_mode::mux_cancel_channel(protocol_type prtcl, kcp_mappings *kcp_mapp session->when_disconnect(empty_tcp_disconnect); session->session_is_ending(true); session->pause(false); - session->disconnect(); session->stop(); } @@ -1114,7 +1113,6 @@ void client_mode::process_disconnect(uint32_t conv, tcp_session *session) session->when_disconnect(empty_tcp_disconnect); session->session_is_ending(true); session->pause(false); - session->disconnect(); session->stop(); kcp_channels.erase(kcp_channel_iter); @@ -1228,7 +1226,6 @@ void client_mode::delete_mux_records(uint32_t conv) if (mux_records_ptr->local_tcp != nullptr) { mux_records_ptr->local_tcp->when_disconnect(empty_tcp_disconnect); - mux_records_ptr->local_tcp->disconnect(); mux_records_ptr->local_tcp->stop(); mux_records_ptr->local_tcp = nullptr; } @@ -1339,7 +1336,6 @@ void client_mode::cleanup_expiring_handshake_connections() if (kcp_mappings_ptr->egress_forwarder != nullptr) { kcp_mappings_ptr->egress_forwarder->remove_callback(); - kcp_mappings_ptr->egress_forwarder->disconnect(); kcp_mappings_ptr->egress_forwarder->stop(); } @@ -1426,7 +1422,6 @@ void client_mode::loop_find_expires() if (expiring_kcp.find(kcp_mappings_ptr) == expiring_kcp.end()) { tcp_channel->when_disconnect(empty_tcp_disconnect); - tcp_channel->disconnect(); tcp_channel->stop(); expiring_kcp.insert({ kcp_mappings_ptr, time_right_now }); } @@ -1888,7 +1883,6 @@ void client_mode::handshake_test_failure(kcp_mappings *handshake_ptr) void client_mode::handshake_test_cleanup(kcp_mappings *handshake_ptr) { handshake_ptr->egress_forwarder->remove_callback(); - handshake_ptr->egress_forwarder->disconnect(); handshake_ptr->egress_forwarder->stop(); std::scoped_lock lock_handshake{ mutex_handshakes, mutex_expiring_handshakes }; diff --git a/src/networks/connections.cpp b/src/networks/connections.cpp index 99a9f91..e732f50 100644 --- a/src/networks/connections.cpp +++ b/src/networks/connections.cpp @@ -483,22 +483,20 @@ void tcp_session::stop() { stopped.store(true); callback = empty_tcp_callback; - asio::error_code ec; - if (is_open()) - connection_socket.close(ec); + disconnect(); } -bool tcp_session::is_pause() +bool tcp_session::is_pause() const { return paused.load(); } -bool tcp_session::is_stop() +bool tcp_session::is_stop() const { return stopped.load(); } -bool tcp_session::is_open() +bool tcp_session::is_open() const { return connection_socket.is_open(); } @@ -507,11 +505,13 @@ void tcp_session::disconnect() { asio::error_code ec; connection_socket.shutdown(asio::socket_base::shutdown_both, ec); + ec.clear(); + connection_socket.close(ec); } void tcp_session::async_read_data() { - if (paused.load() || stopped.load()) + if (paused.load() || stopped.load() || !connection_socket.is_open()) return; std::unique_ptr buffer_cache = std::make_unique(gbv_buffer_size); @@ -535,7 +535,7 @@ size_t tcp_session::send_data(const std::vector &buffer_data) size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes) { - if (stopped.load() || buffer_data == nullptr) + if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr) return 0; size_t sent_size = connection_socket.send(asio::buffer(buffer_data, size_in_bytes)); @@ -545,7 +545,7 @@ size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes) size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes, asio::error_code &ec) { - if (stopped.load() || buffer_data == nullptr) + if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr) return 0; size_t sent_size = connection_socket.send(asio::buffer(buffer_data, size_in_bytes), 0, ec); @@ -555,7 +555,7 @@ size_t tcp_session::send_data(const uint8_t *buffer_data, size_t size_in_bytes, void tcp_session::async_send_data(std::unique_ptr> data) { - if (stopped.load() || data == nullptr) + if (stopped.load() || !connection_socket.is_open() || data == nullptr) return; auto asio_buffer = asio::buffer(*data); @@ -568,7 +568,7 @@ void tcp_session::async_send_data(std::unique_ptr> data) void tcp_session::async_send_data(std::vector &&data) { - if (stopped.load()) + if (stopped.load() || !connection_socket.is_open()) return; auto asio_buffer = asio::buffer(data); @@ -579,7 +579,7 @@ void tcp_session::async_send_data(std::vector &&data) void tcp_session::async_send_data(std::unique_ptr buffer_data, size_t size_in_bytes) { - if (stopped.load() || buffer_data == nullptr) + if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr) return; auto asio_buffer = asio::buffer(buffer_data.get(), size_in_bytes); @@ -590,7 +590,7 @@ void tcp_session::async_send_data(std::unique_ptr buffer_data, size_t void tcp_session::async_send_data(std::unique_ptr buffer_data, uint8_t *start_pos, size_t size_in_bytes) { - if (stopped.load() || buffer_data == nullptr || start_pos == nullptr) + if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr || start_pos == nullptr) return; asio::async_write(connection_socket, asio::buffer(start_pos, size_in_bytes), @@ -600,7 +600,7 @@ void tcp_session::async_send_data(std::unique_ptr buffer_data, uint8_ void tcp_session::async_send_data(const uint8_t *buffer_data, size_t size_in_bytes) { - if (stopped.load() || buffer_data == nullptr) + if (stopped.load() || !connection_socket.is_open() || buffer_data == nullptr) return; asio::async_write(connection_socket, asio::buffer(buffer_data, size_in_bytes), @@ -909,12 +909,12 @@ void udp_client::stop() this->disconnect(); } -bool udp_client::is_pause() +bool udp_client::is_pause() const { return paused.load(); } -bool udp_client::is_stop() +bool udp_client::is_stop() const { return stopped.load(); } diff --git a/src/networks/connections.hpp b/src/networks/connections.hpp index cf33635..b99e80b 100644 --- a/src/networks/connections.hpp +++ b/src/networks/connections.hpp @@ -186,9 +186,9 @@ class tcp_session : public std::enable_shared_from_this void pause(bool set_as_pause); void stop(); - bool is_pause(); - bool is_stop(); - bool is_open(); + bool is_pause() const; + bool is_stop() const; + bool is_open() const; void disconnect(); @@ -401,8 +401,8 @@ class udp_client : public std::enable_shared_from_this void pause(bool set_as_pause); void stop(); - bool is_pause(); - bool is_stop(); + bool is_pause() const; + bool is_stop() const; udp::resolver::results_type get_remote_hostname(const std::string &remote_address, asio::ip::port_type port_num, asio::error_code &ec); udp::resolver::results_type get_remote_hostname(const std::string &remote_address, const std::string &port_num, asio::error_code &ec); diff --git a/src/networks/kcp.cpp b/src/networks/kcp.cpp index fca83c6..97c1c42 100644 --- a/src/networks/kcp.cpp +++ b/src/networks/kcp.cpp @@ -249,7 +249,9 @@ namespace KCP // nc: 0:normal congestion control(default), 1:disable congestion control int KCP::NoDelay(int nodelay, int interval, int resend, bool nc) { - return ikcp_nodelay((ikcpcb *)ikcp_ptr, nodelay, interval, resend, nc); + int ret = ikcp_nodelay((ikcpcb *)ikcp_ptr, nodelay, interval, resend, nc); + ((ikcpcb *)ikcp_ptr)->interval = interval; + return ret; } uint32_t KCP::GetConv(const void *ptr) diff --git a/src/networks/server.cpp b/src/networks/server.cpp index 7f7079b..418f9ee 100644 --- a/src/networks/server.cpp +++ b/src/networks/server.cpp @@ -235,7 +235,6 @@ void server_mode::udp_listener_incoming_unpack(std::unique_ptr data, if (prtcl == protocol_type::udp) { std::shared_ptr &udp_channel = kcp_mappings_ptr->local_udp; - udp_channel->disconnect(); udp_channel->stop(); } if (prtcl == protocol_type::mux) @@ -666,14 +665,12 @@ void server_mode::mux_cancel_channel(protocol_type prtcl, std::shared_ptrwhen_disconnect(empty_tcp_disconnect); session->session_is_ending(true); session->pause(false); - session->disconnect(); session->stop(); } if (prtcl == protocol_type::udp) { std::shared_ptr &udp_channel = mux_records_ptr->local_udp; - udp_channel->disconnect(); udp_channel->stop(); } } @@ -1077,7 +1074,6 @@ void server_mode::process_tcp_disconnect(tcp_session *session, std::weak_ptrwhen_disconnect(empty_tcp_disconnect); session->session_is_ending(true); session->pause(false); - session->disconnect(); session->stop(); std::vector data = packet::inform_disconnect_packet(protocol_type::tcp); kcp_ptr->Send((const char *)data.data(), data.size()); @@ -1126,7 +1122,6 @@ void server_mode::process_tcp_disconnect(tcp_session *session, std::weak_ptrwhen_disconnect(empty_tcp_disconnect); session->session_is_ending(true); session->pause(false); - session->disconnect(); session->stop(); std::scoped_lock lockers{mutex_id_map_to_mux_records, mutex_expiring_mux_records}; @@ -1222,14 +1217,12 @@ void server_mode::delete_mux_records(uint32_t conv) if (mux_records_ptr->local_tcp != nullptr) { mux_records_ptr->local_tcp->when_disconnect(empty_tcp_disconnect); - mux_records_ptr->local_tcp->disconnect(); mux_records_ptr->local_tcp->stop(); mux_records_ptr->local_tcp = nullptr; } if (mux_records_ptr->local_udp != nullptr) { - mux_records_ptr->local_udp->disconnect(); mux_records_ptr->local_udp->stop(); mux_records_ptr->local_udp = nullptr; } @@ -1246,7 +1239,6 @@ void server_mode::delete_mux_records(uint32_t conv) { mux_records_ptr->local_tcp->when_disconnect(empty_tcp_disconnect); mux_records_ptr->local_tcp->stop(); - mux_records_ptr->local_tcp->disconnect(); mux_records_ptr->local_tcp = nullptr; } @@ -1317,7 +1309,6 @@ void server_mode::cleanup_expiring_data_connections() if (current_session != nullptr) { current_session->when_disconnect(empty_tcp_disconnect); - current_session->disconnect(); current_session->stop(); current_session = nullptr; } @@ -1328,7 +1319,6 @@ void server_mode::cleanup_expiring_data_connections() std::shared_ptr current_session = kcp_mappings_ptr->local_udp; if (current_session != nullptr) { - current_session->disconnect(); current_session->stop(); } break; @@ -1365,7 +1355,6 @@ void server_mode::cleanup_expiring_mux_records() if (calculate_difference(mux_records_ptr->last_data_transfer_time.load(), time_right_now) < current_settings.udp_timeout) continue; - local_udp->disconnect(); local_udp->stop(); std::vector data = packet::inform_mux_cancel_packet(protocol_type::udp, mux_records_ptr->connection_id);