Skip to content

Commit

Permalink
gank a thread and sleep instead of scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
schlawg committed Oct 21, 2024
1 parent 89233ac commit 523af27
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,6 @@ object Monitor:
val step = Kamon.gauge("connector.flush.config.step").withoutTags()
val interval = Kamon.gauge("connector.flush.config.interval").withoutTags()
val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
val loopRuntimeMicroseconds = Kamon.histogram("connector.flush.loopRuntimeMicroseconds").withoutTags()
27 changes: 12 additions & 15 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import java.util.concurrent.TimeUnit

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

Expand All @@ -21,6 +19,8 @@ final private class ActorChannelConnector(

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val monitor = Monitor.connector.flush
private val flushThread = Future:
while !workers.isShuttingDown && !workers.isTerminated do Thread.sleep(flush())

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
Expand All @@ -33,8 +33,6 @@ final private class ActorChannelConnector(
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS)

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
channel.attr(key.client).set(clientPromise.future)
Expand All @@ -58,25 +56,17 @@ final private class ActorChannelConnector(
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in if withFlush || !config.isFlushQEnabled() =>
case in if withFlush || flushThread.isCompleted || !config.isFlushQEnabled() =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Unit =
private def flush(): Int =
val entryUsecs = System.nanoTime() / 1000
val qSize = flushQ.size
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)
val nextIntervalMillis =
if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue

workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS)

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
Expand All @@ -85,3 +75,10 @@ final private class ActorChannelConnector(
channelsToFlush -= 1
case _ =>
channelsToFlush = 0
monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)
monitor.loopRuntimeMicroseconds.record(System.nanoTime() / 1000 - entryUsecs)

if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue

0 comments on commit 523af27

Please sign in to comment.