Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dgram: reuseAddr/reusePort fixes #16677

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/bun-types/bun.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3734,14 +3734,20 @@ declare module "bun" {
port?: string | number;

/**
* If the `SO_REUSEPORT` flag should be set.
* Whether the `SO_REUSEPORT` flag should be set.
*
* This allows multiple processes to bind to the same port, which is useful for load balancing.
*
* @default false
*/
reusePort?: boolean;

/**
* Whether the `IPV6_V6ONLY` flag should be set.
* @default false
*/
ipv6Only?: boolean;

/**
* What hostname should the server listen on?
*
Expand Down
120 changes: 70 additions & 50 deletions packages/bun-usockets/src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,52 +801,83 @@ static int us_internal_bind_and_listen(LIBUS_SOCKET_DESCRIPTOR listenFd, struct
return result;
}

inline __attribute__((always_inline)) LIBUS_SOCKET_DESCRIPTOR bsd_bind_listen_fd(
LIBUS_SOCKET_DESCRIPTOR listenFd,
struct addrinfo *listenAddr,
int port,
int options,
int* error
) {
static int bsd_set_reuseaddr(LIBUS_SOCKET_DESCRIPTOR listenFd) {
const int one = 1;
#if defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__)
return setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
#else
return setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
#endif
}

if ((options & LIBUS_LISTEN_EXCLUSIVE_PORT)) {
static int bsd_set_reuseport(LIBUS_SOCKET_DESCRIPTOR listenFd) {
#if defined(__linux__)
// Among Bun's supported platforms, only Linux does load balancing with SO_REUSEPORT.
const int one = 1;
return setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
#else
#if _WIN32
int optval2 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, &optval2, sizeof(optval2));
WSASetLastError(WSAEOPNOTSUPP);
#endif
} else {
if((options & LIBUS_LISTEN_REUSE_PORT)) {
int optval2 = 1;
#if defined(SO_REUSEPORT)
setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &optval2, sizeof(optval2));
#else
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &optval2, sizeof(optval2));
errno = ENOTSUP;
return -1;
#endif
}

static int bsd_set_reuse(LIBUS_SOCKET_DESCRIPTOR listenFd, int options) {
int result = 0;

if ((options & LIBUS_LISTEN_EXCLUSIVE_PORT)) {
#if _WIN32
const int one = 1;
result = setsockopt(listenFd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, &one, sizeof(one));
if (result != 0) {
return result;
}
#endif
}

#if defined(SO_REUSEADDR)
#ifndef _WIN32
if ((options & LIBUS_LISTEN_REUSE_ADDR)) {
result = bsd_set_reuseaddr(listenFd);
if (result != 0) {
return result;
}
}

// Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
// allow binding to addresses that are in use by sockets in TIME_WAIT, it
// effectively allows 'stealing' a port which is in use by another application.
// See libuv issue #1360.
if ((options & LIBUS_LISTEN_REUSE_PORT)) {
result = bsd_set_reuseport(listenFd);
if (result != 0) {
if (errno == ENOTSUP) {
if ((options & LIBUS_LISTEN_DISALLOW_REUSE_PORT_FAILURE) == 0) {
errno = 0;
return 0;
}
}

return result;
}
}

int optval3 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &optval3, sizeof(optval3));
#endif
#endif
return 0;
}

inline __attribute__((always_inline)) LIBUS_SOCKET_DESCRIPTOR bsd_bind_listen_fd(
LIBUS_SOCKET_DESCRIPTOR listenFd,
struct addrinfo *listenAddr,
int port,
int options,
int* error
) {

if (bsd_set_reuse(listenFd, options) != 0) {
return LIBUS_SOCKET_ERROR;
}

#ifdef IPV6_V6ONLY
// TODO: revise support to match node.js
// if (listenAddr->ai_family == AF_INET6) {
// int disabled = (options & LIBUS_SOCKET_IPV6_ONLY) != 0;
// setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, &disabled, sizeof(disabled));
// }
int disabled = 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, &disabled, sizeof(disabled));
if (listenAddr->ai_family == AF_INET6) {
int enabled = (options & LIBUS_SOCKET_IPV6_ONLY) != 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, &enabled, sizeof(enabled));
heimskr marked this conversation as resolved.
Show resolved Hide resolved
}
#endif

if (us_internal_bind_and_listen(listenFd, listenAddr->ai_addr, (socklen_t) listenAddr->ai_addrlen, 512, error)) {
Expand Down Expand Up @@ -1105,26 +1136,15 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port, int op
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled));
}

if ((options & LIBUS_LISTEN_EXCLUSIVE_PORT)) {
#if _WIN32
int optval2 = 1;
setsockopt(listenFd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, &optval2, sizeof(optval2));
#endif
} else {
if((options & LIBUS_LISTEN_REUSE_PORT)) {
int optval2 = 1;
#if defined(SO_REUSEPORT)
setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &optval2, sizeof(optval2));
#else
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &optval2, sizeof(optval2));
#endif
}
if (bsd_set_reuse(listenFd, options) != 0) {
freeaddrinfo(result);
return LIBUS_SOCKET_ERROR;
}

#ifdef IPV6_V6ONLY
if (listenAddr->ai_family == AF_INET6) {
int disabled = (options & LIBUS_SOCKET_IPV6_ONLY) != 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, &disabled, sizeof(disabled));
int enabled = (options & LIBUS_SOCKET_IPV6_ONLY) != 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, &enabled, sizeof(enabled));
heimskr marked this conversation as resolved.
Show resolved Hide resolved
}
#endif

Expand Down
2 changes: 2 additions & 0 deletions packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ enum {
LIBUS_LISTEN_REUSE_PORT = 4,
/* Setting ipv6Only will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound.*/
LIBUS_SOCKET_IPV6_ONLY = 8,
LIBUS_LISTEN_REUSE_ADDR = 16,
LIBUS_LISTEN_DISALLOW_REUSE_PORT_FAILURE = 32,
};

/* Library types publicly available */
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ pub const Listener = struct {

const ssl_enabled = ssl != null;

var socket_flags: i32 = if (exclusive) uws.LIBUS_LISTEN_EXCLUSIVE_PORT else (if (socket_config.reusePort) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_DEFAULT);
var socket_flags: i32 = if (exclusive) uws.LIBUS_LISTEN_EXCLUSIVE_PORT else (if (socket_config.reusePort) uws.LIBUS_LISTEN_REUSE_PORT else uws.LIBUS_LISTEN_DEFAULT);
if (socket_config.allowHalfOpen) {
socket_flags |= uws.LIBUS_SOCKET_ALLOW_HALF_OPEN;
}
Expand Down
21 changes: 17 additions & 4 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub const ServerConfig = struct {
reuse_port: bool = false,
id: []const u8 = "",
allow_hot: bool = true,
ipv6_only: bool = false,

static_routes: std.ArrayList(StaticRouteEntry) = std.ArrayList(StaticRouteEntry).init(bun.default_allocator),

Expand Down Expand Up @@ -449,6 +450,15 @@ pub const ServerConfig = struct {
return arraylist.items;
}

pub fn getUsocketsOptions(this: *const ServerConfig) i32 {
heimskr marked this conversation as resolved.
Show resolved Hide resolved
// Unlike Node.js, we set exclusive port in case reuse port is not set
var out: i32 = if (this.reuse_port) uws.LIBUS_LISTEN_REUSE_PORT else uws.LIBUS_LISTEN_EXCLUSIVE_PORT;
if (this.ipv6_only) {
out |= uws.LIBUS_SOCKET_IPV6_ONLY;
}
return out;
}

pub const SSLConfig = struct {
requires_custom_request_ctx: bool = false,
server_name: [*c]const u8 = null,
Expand Down Expand Up @@ -1279,6 +1289,11 @@ pub const ServerConfig = struct {
}
if (global.hasException()) return error.JSError;

if (try arg.get(global, "ipv6Only")) |dev| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a getOptionalBoolean function

args.ipv6_only = dev.coerce(bool, global);
}
if (global.hasException()) return error.JSError;

if (try arg.get(global, "inspector")) |inspector| {
args.inspector = inspector.coerce(bool, global);

Expand Down Expand Up @@ -7554,8 +7569,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
app.listenWithConfig(*ThisServer, this, onListen, .{
.port = tcp.port,
.host = host,
// IPV6_ONLY is the default for bun, different from node it also set exclusive port in case reuse port is not set
.options = (if (this.config.reuse_port) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_EXCLUSIVE_PORT) | uws.LIBUS_SOCKET_IPV6_ONLY,
.options = this.config.getUsocketsOptions(),
});
},

Expand All @@ -7565,8 +7579,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
this,
onListen,
unix,
// IPV6_ONLY is the default for bun, different from node it also set exclusive port in case reuse port is not set
(if (this.config.reuse_port) uws.LIBUS_SOCKET_REUSE_PORT else uws.LIBUS_LISTEN_EXCLUSIVE_PORT) | uws.LIBUS_SOCKET_IPV6_ONLY,
this.config.getUsocketsOptions(),
);
},
}
Expand Down
4 changes: 3 additions & 1 deletion src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub const u_int64_t = c_ulonglong;
pub const LIBUS_LISTEN_DEFAULT: i32 = 0;
pub const LIBUS_LISTEN_EXCLUSIVE_PORT: i32 = 1;
pub const LIBUS_SOCKET_ALLOW_HALF_OPEN: i32 = 2;
pub const LIBUS_SOCKET_REUSE_PORT: i32 = 4;
pub const LIBUS_LISTEN_REUSE_PORT: i32 = 4;
pub const LIBUS_SOCKET_IPV6_ONLY: i32 = 8;
pub const LIBUS_LISTEN_REUSE_ADDR: i32 = 16;
pub const LIBUS_LISTEN_DISALLOW_REUSE_PORT_FAILURE: i32 = 32;

pub const Socket = opaque {
pub fn write2(this: *Socket, first: []const u8, second: []const u8) i32 {
Expand Down
14 changes: 7 additions & 7 deletions src/js/node/dgram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ const SEND_BUFFER = false;
const LIBUS_LISTEN_DEFAULT = 0;
const LIBUS_LISTEN_EXCLUSIVE_PORT = 1;
const LIBUS_SOCKET_ALLOW_HALF_OPEN = 2;
const LIBUS_SOCKET_REUSE_PORT = 4;
const LIBUS_LISTEN_REUSE_PORT = 4;
const LIBUS_SOCKET_IPV6_ONLY = 8;
const LIBUS_LISTEN_REUSE_ADDR = 16;
const LIBUS_LISTEN_DISALLOW_REUSE_PORT_FAILURE = 32;

const kStateSymbol = Symbol("state symbol");
const kOwnerSymbol = Symbol("owner symbol");
Expand Down Expand Up @@ -165,7 +167,7 @@ function Socket(type, listener) {
bindState: BIND_STATE_UNBOUND,
connectState: CONNECT_STATE_DISCONNECTED,
queue: undefined,
reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true.
reuseAddr: options && options.reuseAddr,
reusePort: options && options.reusePort,
ipv6Only: options && options.ipv6Only,
recvBufferSize,
Expand Down Expand Up @@ -301,10 +303,10 @@ Socket.prototype.bind = function (port_, address_ /* , callback */) {
return;
}

let flags = 0;
let flags = LIBUS_LISTEN_DISALLOW_REUSE_PORT_FAILURE;

if (state.reuseAddr) {
flags |= 0; //UV_UDP_REUSEADDR;
flags |= LIBUS_LISTEN_REUSE_ADDR;
}

if (state.ipv6Only) {
Expand All @@ -313,9 +315,7 @@ Socket.prototype.bind = function (port_, address_ /* , callback */) {

if (state.reusePort) {
exclusive = true; // TODO: cluster support
flags |= LIBUS_SOCKET_REUSE_PORT;
} else {
flags |= LIBUS_LISTEN_EXCLUSIVE_PORT;
flags |= LIBUS_LISTEN_REUSE_PORT;
}

// TODO flags
Expand Down
2 changes: 2 additions & 0 deletions test/js/node/test/parallel/test-dgram-reuseport.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ function test() {
socket2.close();
}));
}));
socket1.on('error', common.mustNotCall());
socket2.on('error', common.mustNotCall());
}

checkSupportReusePort().then(test, () => {
Expand Down
Loading