diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index a228bb0b..f6e13dd0 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -188,5 +188,6 @@ object Monitor: val step = Kamon.gauge("connector.flush.config.step").withoutTags() val interval = Kamon.gauge("connector.flush.config.interval").withoutTags() val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags() - val qSize = Kamon.histogram("connector.flush.qSize").withoutTags() - val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() + val qSize = Kamon.histogram("connector.flush.qSize").withoutTags() + val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() + val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index dc87ede0..ede53fa6 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -7,8 +7,6 @@ import io.netty.handler.codec.http.websocketx.* import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener } import org.apache.pekko.actor.typed.{ ActorRef, Scheduler } -import java.util.concurrent.TimeUnit - import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key @@ -21,6 +19,8 @@ final private class ActorChannelConnector( private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]() private val monitor = Monitor.connector.flush + private val flushThread = Future: + while !workers.isShuttingDown && !workers.isTerminated do Thread.sleep(flush()) private object config: private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key)) @@ -33,8 +33,6 @@ final private class ActorChannelConnector( monitor.config.interval.update(interval.get()) monitor.config.maxDelay.update(maxDelay.get()) - workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS) - def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) @@ -58,25 +56,17 @@ final private class ActorChannelConnector( .addListener(ChannelFutureListener.CLOSE) case ipc.ClientIn.RoundPingFrameNoFlush => channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in if withFlush || !config.isFlushQEnabled() => + case in if withFlush || flushThread.isCompleted || !config.isFlushQEnabled() => channel.writeAndFlush(TextWebSocketFrame(in.write)) case in => channel.write(TextWebSocketFrame(in.write)) flushQ.add(channel) - private def flush(): Unit = + private def flush(): Int = + val entryUsecs = System.nanoTime() / 1000 val qSize = flushQ.size val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1) var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt) - val nextIntervalMillis = - if config.isFlushQEnabled() then config.interval.get() - else if qSize == 0 then 1000 // hibernate - else 1 // interval is 0 but we still need to empty the queue - - workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS) - - monitor.qSize.record(qSize) - monitor.channelsToFlush.record(channelsToFlush) while channelsToFlush > 0 do Option(flushQ.poll()) match @@ -85,3 +75,10 @@ final private class ActorChannelConnector( channelsToFlush -= 1 case _ => channelsToFlush = 0 + monitor.qSize.record(qSize) + monitor.channelsToFlush.record(channelsToFlush) + monitor.loopRuntimeMicroseconds.record(System.nanoTime() / 1000 - entryUsecs) + + if config.isFlushQEnabled() then config.interval.get() + else if qSize == 0 then 1000 // hibernate + else 1 // interval is 0 but we still need to empty the queue