Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency issue in typedthreads.nim #24591

Open
mk1nz opened this issue Dec 30, 2024 · 9 comments · May be fixed by #24612
Open

Concurrency issue in typedthreads.nim #24591

mk1nz opened this issue Dec 30, 2024 · 9 comments · May be fixed by #24612

Comments

@mk1nz
Copy link

mk1nz commented Dec 30, 2024

Description

Thread sanitiser and valgrind (--tool=helgrind) report a data race (probably read) in typedthreads.nim on line 274 (Using devel version of the compiler. The problem also occurs with other compiler versions, but the reported line numbers vary.).
Example using channels:

import 
  std/net,
  std/locks,
  std/os

var lock: Lock = Lock()

type SocketOpts* = object
  socket*: Socket

proc processSession(thChan: ptr Channel[Socket]) {.thread.} =
  var response: tuple[dataAvailable: bool, msg: Socket]
  withLock lock:
    try:
      response = thChan[].tryRecv()
    except:
      debugEcho "Unable to receive chan message"
      return
  if response.dataAvailable:
    if response.msg.isNil:
      withLock lock:
        debugEcho "Client socket is nil"
      return
    withLock lock:
      debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](response.msg[])
      discard trySend(response.msg, "hello\n")
    sleep(1000)
    withLock lock:
      response.msg.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[10, Thread[ptr Channel[Socket]]]
  sockets: array[10, Channel[Socket]]
  address: string = ""
  client: Socket = Socket()

for id in 0..sockets.high:
  try:
    sockets[id].open()
  except:
    quit 1

proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:

      srv.socket.acceptAddr(client, address)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue

        createThread(thr[id], processSession, sockets[id].unsafeAddr)

        while not sockets[id].trySend(client):
          echo "Try again"
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

Example of using an array to store client sockets:

import 
  std/net,
  std/locks,
  std/os

type SocketOpts* = object
  socket*: Socket

var lock: Lock = Lock()

proc processSession(sock: Socket) {.thread.} =
  if sock.isNil:
    withLock lock:
      debugEcho "Client socket is nil"
    return
  withLock lock:
    debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](sock[])
    discard trySend(sock, "hello\n")
  sleep(1000)
  withLock lock:
    sock.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[0..10, Thread[Socket]]
  sockets: array[0..10, Socket]
  address: string = ""
  client: Socket = Socket()

proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:
      
      srv.socket.accept(client)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue
        sockets[id] = client
        copyMem(sockets[id].unsafeAddr, client.unsafeAddr, sizeof(Socket))
        echo "Client connected from: ", address, ". Socket: ", cast[uint16](sockets[id])
        createThread(thr[id], processSession, sockets[id])
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

nim.cfg :

--gc:orc
--lineTrace:on
--lineDir:on
--d:nimTypeNames
--debuginfo
--d:debug
--d:nimDebugDlOpen
--d:useMalloc
--opt:none
--passC:" -g3 -O0 -fsanitize=thread"
--passL:" -g3 -O0 -fsanitize=thread"

Nim Version

Current devel version,
2.2.0
1.6.20

Current Output

Following output 

WARNING: ThreadSanitizer: data race (pid=41382)
  Read of size 8 at 0x000102add050 by main thread (mutexes: write M0):
    #0 typedthreads::running(Thread<ptr<Channel<ref<net::SocketImpl>>>>) typedthreads.nim:154 (testTcpServerChannels:arm64+0x100023a28)
    #1 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:71 (testTcpServerChannels:arm64+0x1000233f8)
    #2 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Previous write of size 8 at 0x000102add050 by thread T1:
    #0 typedthreads::threadProcWrapper(pointer) threadimpl.nim:110 (testTcpServerChannels:arm64+0x100008148)

  Location is global 'thr__test84cp83erver67hannels_u86' at 0x000102add040 (testTcpServerChannels+0x100031050)

  Mutex M0 (0x000102adc8e0) created at:
    #0 pthread_mutex_init <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x3181c)
    #1 locks::initLock(var<syslocks::SysLockObj>) locks.nim:38 (testTcpServerChannels:arm64+0x100024614)
    #2 NimMainModule testTcpServerChannels.nim:32 (testTcpServerChannels:arm64+0x1000240b0)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Thread T1 (tid=10953362, finished) created by main thread at:
    #0 pthread_create <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x309d8)
    #1 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:292 (testTcpServerChannels:arm64+0x100008350)
    #2 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #3 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #4 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #5 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #6 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

SUMMARY: ThreadSanitizer: data race typedthreads.nim:154 in typedthreads::running(Thread<ptr<Channel<ref<net::SocketImpl>>>>)
==================
==================
WARNING: ThreadSanitizer: data race (pid=41382)
  Write of size 8 at 0x000102add040 by main thread (mutexes: write M0):
    #0 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:274 (testTcpServerChannels:arm64+0x100008214)
    #1 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #2 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Previous write of size 8 at 0x000102add040 by thread T1:
    #0 typedthreads::threadProcWrapper(pointer) threadimpl.nim:109 (testTcpServerChannels:arm64+0x100008130)

  Location is global 'thr__test84cp83erver67hannels_u86' at 0x000102add040 (testTcpServerChannels+0x100031040)

  Mutex M0 (0x000102adc8e0) created at:
    #0 pthread_mutex_init <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x3181c)
    #1 locks::initLock(var<syslocks::SysLockObj>) locks.nim:38 (testTcpServerChannels:arm64+0x100024614)
    #2 NimMainModule testTcpServerChannels.nim:32 (testTcpServerChannels:arm64+0x1000240b0)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Thread T1 (tid=10953362, finished) created by main thread at:
    #0 pthread_create <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x309d8)
    #1 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:292 (testTcpServerChannels:arm64+0x100008350)
    #2 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #3 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #4 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #5 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #6 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

SUMMARY: ThreadSanitizer: data race typedthreads.nim:274 in typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>)

Expected Output

No response

Known Workarounds

No response

Additional Information

To test the code, use the following tools in two separate terminals

watch -n 0.1 nc 127.0.0.1 3333

To test with valgrind (--tool=helgrind), the thread sanitiser should be disabled.

@mk1nz
Copy link
Author

mk1nz commented Jan 2, 2025

Example with use of manual allocation:
--deepcopy:on -> nim.cfg

Results with MM ORC:

SUMMARY: ThreadSanitizer: data race typedthreads.nim:154 in typedthreads::running(Thread<refnet::SocketImpl>)

--mm:none:

SUMMARY: ThreadSanitizer: data race typedthreads.nim:271 in typedthreads::createThread(var<Thread<refnet::SocketImpl>>, proc<refnet::SocketImpl>, refnet::SocketImpl)

Nim Compiler Version 2.3.1

import 
  std/net,
  std/locks,
  std/os

type SocketOpts* = object
  socket*: Socket

var lock: Lock = Lock()

proc processSession(sock: Socket) {.thread.} =
  if sock.isNil:
    withLock lock:
      debugEcho "Client socket is nil"
    return
  withLock lock:
    debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](sock[])
    discard trySend(sock, "hello\n")
  sleep(1000)
  withLock lock:
    sock.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[0..10, Thread[Socket]]
  sockets: array[0..10, Socket]
  address: string = ""
  client: Socket

for sock in sockets.mitems:
  sock = cast[Socket](alloc(sizeof(SocketImpl)))



proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:
      
      srv.socket.acceptAddr(client, address)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue

        deepCopy(sockets[id], client)
        echo "Client connected from: ", address, ". Socket: ", cast[uint16](sockets[id][])
        createThread(thr[id], processSession, sockets[id])
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

@elcritch
Copy link
Contributor

elcritch commented Jan 6, 2025

I reworked the code to use channels and not create new threads on each socket. Thread-sanitizer with my macOS clang Apple clang version 16.0.0 (clang-1600.0.26.3) runs fine!

Also I fixed the lock init. Couple of notes. Unfortunately var lock = Lock() doesn't actually initialize the lock. You gotta call initLock(lock). Easy gotcha unfortunately. Also, Nim sockets aren't just socket ints, but ref objects which wrap the socket handle. You need to use getFd(socket).int to get a int version of the socket handle back.

import 
  std/net,
  std/locks,
  std/os,
  std/isolation

import threading/channels

var
  lock: Lock
  busy: array[10, bool]

lock.initLock()

type SocketOpts* = object
  socket*: Socket

proc processSession(thChan: Chan[(Socket, int)]) {.thread.} =
  var thChan = thChan # needed to ensure we have a local copy

  while true:
    var sockArg: (Socket, int) = thChan.recv()
    let sock = sockArg[0]
    let id = sockArg[1]
    withLock lock:
      busy[id] = true
    echo "\tThread ", id, " got request to send to socket ", sock.getFd().int

    discard trySend(sock, "hello\n")
    os.sleep(1000)
    sock.close()
    withLock lock:
      echo "\tThread ", getThreadId(), " done"
      busy[id] = false

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[10, Thread[Chan[(Socket, int)]]]
  chans: array[10, Chan[(Socket, int)]]
  address: string = ""

for id in 0..<chans.len():
  try:
    chans[id] = newChan[(Socket, int)]()
    withLock lock:
      busy[id] = false
    createThread(thr[id], processSession, chans[id])
  except:
    quit 1

proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    while true:

      var client: Socket = Socket()
      srv.socket.acceptAddr(client, address)
      echo "Got connection "

      var wasSent = false
      for id in 0..<chans.len():
        echo "checking thread: ", id
        var isBusy: bool
        withLock lock:
          isBusy = busy[id]
        if isBusy:
          echo "thread busy: ", id
          continue
        else:
          var sm: (Socket, int)
          sm[0] = move client
          sm[1] = id
          chans[id].send(unsafeIsolate move sm)
          echo "sent to thread: ", id
          wasSent = true
          break
 
      if not wasSent and client != nil:
        echo "No threads available"
        # client.send("No threads available")
        client.close()
        continue

  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

@elcritch
Copy link
Contributor

elcritch commented Jan 6, 2025

I believe typedthreads.nim is generally fine with t-san aside from the running not being atomic. It's also tricky to ensure ARC types are moved correctly.

The second example in this issue would need to do something like createThread(thr[id], processSession, ensureMove sockets[id]). Even then it's very easy to create a duplicate ref locally which t-san will pick up in either arc.nim or orc.nim or possible some other like eqdestroy.

thread.running should be made into an atomic bool operation.

@mk1nz
Copy link
Author

mk1nz commented Jan 6, 2025

Thanks Elcritch!

Unfortunately var lock = Lock() doesn't actually initialize the lock.

I did, just after implementing the processSession(). Does it matter where the initLock(lock) is placed?

@elcritch
Copy link
Contributor

elcritch commented Jan 6, 2025

Thanks Elcritch!

Unfortunately var lock = Lock() doesn't actually initialize the lock.

I did, just after implementing the processSession(). Does it matter where the initLock(lock) is placed?

Ah, my bad I saw the lock = Lock() and must've deleted your initLock and figured you'd just used Lock().

No it shouldn't matter as long as it's before it's used.

@elcritch
Copy link
Contributor

elcritch commented Jan 6, 2025

No it shouldn't matter as long as it's before it's used.

Very strange thing. I moved the initialization of the lock before lines of processSession() code, and now I'm not getting data race reports from T-san! Testing now with second version of test code (using arrays)

Oh weird. It really shouldn't matter, but looks like my version actually did have your initLock. Maybe it's the try/except messing with the ordering?

You could try putting the initLock in your main proc.

@elcritch
Copy link
Contributor

elcritch commented Jan 6, 2025

I took a stab at a PR to fix the t-san issues with typedthreads running proc. #24603

@mk1nz
Copy link
Author

mk1nz commented Jan 6, 2025

No it shouldn't matter as long as it's before it's used.

Very strange thing. I moved the initialization of the lock before lines of processSession() code, and now I'm not getting data race reports from T-san! Testing now with second version of test code (using arrays)

Oh weird. It really shouldn't matter, but looks like my version actually did have your initLock. Maybe it's the try/except messing with the ordering?

You could try putting the initLock in your main proc.

Please ignore my mistake. I forgot to copy nim.cfg (with T-san enabled) into the directory with the test code.

@mk1nz
Copy link
Author

mk1nz commented Jan 6, 2025

Additional version with manually allocated targs in the heap, without using thread.running. This code ran for about 2 hours under high load, but after about an hour T-san reported the following problem:

    #1 typedthreads::createThread(var<Thread<ptr<tuple<ref<net::SocketImpl>, bool>>>>, proc<ptr<tuple<ref<net::SocketImpl>, bool>>>, ptr<tuple<ref<net::SocketImpl>, bool>>) typedthreads.nim:292 (testTcpServerUnsafeAddr:arm64+0x100008b3c)
    #2 testTcpServerUnsafeAddr::startServer(var<testTcpServerUnsafeAddr::SocketOpts>) testTcpServerUnsafeAddr.nim:75 (testTcpServerUnsafeAddr:arm64+0x1000273c0)
    #3 testTcpServerUnsafeAddr::startServer(var<testTcpServerUnsafeAddr::SocketOpts>) testTcpServerUnsafeAddr.nim:45 (testTcpServerUnsafeAddr:arm64+0x100026cc0)
    #4 NimMainInner testTcpServerUnsafeAddr.nim:57 (testTcpServerUnsafeAddr:arm64+0x100027e18)
    #5 NimMain testTcpServerUnsafeAddr.nim:63 (testTcpServerUnsafeAddr:arm64+0x1000282bc)
    #6 main testTcpServerUnsafeAddr.nim:71 (testTcpServerUnsafeAddr:arm64+0x100028388)

SUMMARY: ThreadSanitizer: data race typedthreads.nim:272 in typedthreads::createThread(var<Thread<ptr<tuple<ref<net::SocketImpl>, bool>>>>, proc<ptr<tuple<ref<net::SocketImpl>, bool>>>, ptr<tuple<ref<net::SocketImpl>, bool>>)
==================

More likely to be solved with @elcritch proposed PR, but might be helpful for testing anyway.

import 
  std/net,
  std/locks,
  std/os

type SocketOpts* = object
  socket*: Socket

var lock: Lock = Lock()

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

proc processSession(sock: ptr tuple[sock: ptr Socket, running: bool]) {.thread.} =
  withLock lock:
    if sock[0].isNil:
      debugEcho "Client socket is nil"
      return

  withLock lock:
    sock[1] = true
    debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](sock[0][][])
    discard trySend(sock[0][], "hello\n")

  sleep(1000)
  
  withLock lock:
    sock[0][].close
    sock[1] = false

#cast[array[0..10, Thread[ptr tuple[sock: Socket, running: bool]]]](alloc(sizeof(array[0..10, Thread[ptr tuple[sock: Socket, running: bool]]])))
var 
  thr: array[0..10, Thread[ptr tuple[sock: ptr Socket, running: bool]]]
  sockets: array[0..10, tuple[sock: ptr Socket, running: bool]]
  address: string = ""
  client: Socket

for sock in sockets.mitems:
  sock[0]= cast[ ptr Socket](alloc(sizeof(SocketImpl)))
  sock[1] = cast[bool](alloc(sizeof(bool)))
  sock[1] = false


proc startServer*(srv: var SocketOpts): void =
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:
      
      srv.socket.acceptAddr(client, address)

      withLock lock:
        if sockets[id][1]:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue

        deepCopy(sockets[id][0][], client)
        echo "Client connected from: ", address, ". Socket: ", cast[uint16](sockets[id][0][][])
        createThread(thr[id], processSession, sockets[id].unsafeAddr)
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

@mk1nz mk1nz linked a pull request Jan 11, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants