From 79ba9c02c7c29b467739716d173f6ace14f9436b Mon Sep 17 00:00:00 2001 From: BoomEaro <21033866+BoomEaro@users.noreply.github.com> Date: Mon, 4 Nov 2024 02:07:11 +0200 Subject: [PATCH] If necessary, schedule in the eventloop --- .../net/md_5/bungee/ServerConnection.java | 44 ++++++++++++------- .../java/net/md_5/bungee/UserConnection.java | 43 +++++++++++------- .../net/md_5/bungee/netty/ChannelWrapper.java | 25 ++++++----- 3 files changed, 66 insertions(+), 46 deletions(-) diff --git a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java index b0fbbbf0cd..9d41fcf3d6 100644 --- a/proxy/src/main/java/net/md_5/bungee/ServerConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/ServerConnection.java @@ -4,8 +4,8 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayDeque; +import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import lombok.Data; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -32,7 +32,7 @@ public class ServerConnection implements Server private final boolean forgeServer = false; @Getter private final Queue keepAlives = new ArrayDeque<>(); - private final Queue packetQueue = new ConcurrentLinkedQueue<>(); + private final Queue packetQueue = new LinkedList<>(); private final Unsafe unsafe = new Unsafe() { @@ -45,27 +45,37 @@ public void sendPacket(DefinedPacket packet) public void sendPacketQueued(DefinedPacket packet) { - Protocol encodeProtocol = ch.getEncodeProtocol(); - if ( encodeProtocol == null ) + ch.scheduleIfNecessary( () -> { - return; - } - if ( !encodeProtocol.TO_SERVER.hasPacket( packet.getClass(), ch.getEncodeVersion() ) ) - { - packetQueue.add( packet ); - } else - { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + Protocol encodeProtocol = ch.getEncodeProtocol(); + if ( !encodeProtocol.TO_SERVER.hasPacket( packet.getClass(), ch.getEncodeVersion() ) ) + { + packetQueue.add( packet ); + } else + { + unsafe().sendPacket( packet ); + } + } ); } public void sendQueuedPackets() { - DefinedPacket packet; - while ( ( packet = packetQueue.poll() ) != null ) + ch.scheduleIfNecessary( () -> { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + DefinedPacket packet; + while ( ( packet = packetQueue.poll() ) != null ) + { + unsafe().sendPacket( packet ); + } + } ); } @Override diff --git a/proxy/src/main/java/net/md_5/bungee/UserConnection.java b/proxy/src/main/java/net/md_5/bungee/UserConnection.java index b6e1241e61..5295047775 100644 --- a/proxy/src/main/java/net/md_5/bungee/UserConnection.java +++ b/proxy/src/main/java/net/md_5/bungee/UserConnection.java @@ -21,7 +21,6 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import lombok.Getter; import lombok.NonNull; @@ -146,7 +145,7 @@ public final class UserConnection implements ProxiedPlayer @Setter private ForgeServerHandler forgeServerHandler; /*========================================================================*/ - private final Queue packetQueue = new ConcurrentLinkedQueue<>(); + private final Queue packetQueue = new LinkedList<>(); private final Unsafe unsafe = new Unsafe() { @Override @@ -186,27 +185,37 @@ public void sendPacket(PacketWrapper packet) public void sendPacketQueued(DefinedPacket packet) { - Protocol encodeProtocol = ch.getEncodeProtocol(); - if ( encodeProtocol == null ) + ch.scheduleIfNecessary( () -> { - return; - } - if ( !encodeProtocol.TO_CLIENT.hasPacket( packet.getClass(), getPendingConnection().getVersion() ) ) - { - packetQueue.add( packet ); - } else - { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + Protocol encodeProtocol = ch.getEncodeProtocol(); + if ( !encodeProtocol.TO_CLIENT.hasPacket( packet.getClass(), getPendingConnection().getVersion() ) ) + { + packetQueue.add( packet ); + } else + { + unsafe().sendPacket( packet ); + } + } ); } public void sendQueuedPackets() { - DefinedPacket packet; - while ( ( packet = packetQueue.poll() ) != null ) + ch.scheduleIfNecessary( () -> { - unsafe().sendPacket( packet ); - } + if ( ch.isClosed() ) + { + return; + } + DefinedPacket packet; + while ( ( packet = packetQueue.poll() ) != null ) + { + unsafe().sendPacket( packet ); + } + } ); } @Deprecated diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java index a234df01bf..a15ba5dfb7 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java @@ -42,12 +42,7 @@ public ChannelWrapper(ChannelHandlerContext ctx) public Protocol getDecodeProtocol() { - MinecraftDecoder minecraftDecoder = getMinecraftDecoder(); - if ( minecraftDecoder == null ) - { - return null; - } - return minecraftDecoder.getProtocol(); + return getMinecraftDecoder().getProtocol(); } public void setDecodeProtocol(Protocol protocol) @@ -57,12 +52,7 @@ public void setDecodeProtocol(Protocol protocol) public Protocol getEncodeProtocol() { - MinecraftEncoder minecraftEncoder = getMinecraftEncoder(); - if ( minecraftEncoder == null ) - { - return null; - } - return minecraftEncoder.getProtocol(); + return getMinecraftEncoder().getProtocol(); } public void setEncodeProtocol(Protocol protocol) @@ -242,4 +232,15 @@ public void updateComposite() packetCompressor.setCompose( compressorCompose ); } } + + public void scheduleIfNecessary(Runnable task) + { + if ( ch.eventLoop().inEventLoop() ) + { + task.run(); + return; + } + + ch.eventLoop().execute( task ); + } }