Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dialect-wasqlite-worker transactions break concurrent requests #10

Closed
davidisaaclee opened this issue Nov 14, 2024 · 13 comments
Closed

Comments

@davidisaaclee
Copy link

davidisaaclee commented Nov 14, 2024

Patch to demonstrate the issue in the playground (updated to use Kysely directly)
diff --git a/playground/src/modules/utils.ts b/playground/src/modules/utils.ts
index 8735016..7ddecd2 100644
--- a/playground/src/modules/utils.ts
+++ b/playground/src/modules/utils.ts
@@ -1,8 +1,27 @@
-import type { Dialect } from 'kysely'
+import { Dialect, Kysely } from 'kysely'
 import type { InferDatabase } from 'kysely-sqlite-builder/schema'
 import { SqliteBuilder } from 'kysely-sqlite-builder'
 import { column, defineTable, useSchema } from 'kysely-sqlite-builder/schema'
 
+// helper to manually resolve promises to test concurrency
+export function simpleDeferred<T = void>(): {
+  promise: Promise<T>
+  resolve: (value: T) => void
+  reject: (reason: any) => void
+} {
+  let resolve: (value: T) => void
+  let reject: (reason: any) => void
+  const promise = new Promise<T>((res, rej) => {
+    resolve = res
+    reject = rej
+  })
+  return {
+    promise,
+    resolve: resolve!,
+    reject: reject!,
+  }
+}
+
 const tables = {
   test: defineTable({
     columns: {
@@ -52,9 +71,69 @@ export async function testDB(dialect: Dialect) {
     console.error(error)
   }
 
+  await checkTransactionIsolation(dialect)
+
   return db.selectFrom('test').selectAll().execute().then(async (data) => {
     await db.destroy()
     console.log(data)
     return data
   })
 }
+
+const rollback = Symbol('rollback')
+
+async function checkTransactionIsolation(dialect: Dialect) {
+  const db = new Kysely<DB>({ dialect })
+
+  // Check that requests concurrent with transaction do not interfere with the transaction
+  const innerInsertCompleted = simpleDeferred() // will resolve when inner insert is done
+  const outerSelectCompleted = simpleDeferred()
+  const transactionPromise = db.transaction().execute(async (tx) => {
+    await tx.insertInto('test')
+      .values({ name: 'from transaction' })
+      .execute()
+    innerInsertCompleted.resolve()
+
+    // Wait for the concurrent SELECT to complete, then rollback
+    await timeout(outerSelectCompleted.promise)
+
+    throw rollback
+  }).catch((err) => {
+    // swallow rollback error
+    if (err !== rollback) {
+      throw err
+    }
+  })
+
+  // Wait for the transaction's insert to complete, then SELECT from outside the transaction.
+  // This should not "see" the row inserted by the transaction.
+  await timeout(innerInsertCompleted.promise)
+  const outerSelectPromise = db.selectFrom('test')
+    .selectAll()
+    .where('name', '=', 'from transaction')
+    .execute()
+
+  // I'd expect either:
+  // - transactionPromise resolves first & rolls back changes; then
+  //   outerSelectPromise resolves finding no rows; or...
+  // - transactionPromise and outerSelectPromise resolve concurrently (like
+  //   if you used multiple connections? idk), but outerSelectPromise still
+  //   finds no rows because the transaction is isolated
+  const [outerSelect] = await timeout(Promise.all([
+    outerSelectPromise,
+    transactionPromise,
+  ]))
+
+  outerSelectCompleted.resolve()
+
+  console.log('Concurrency check result (should be empty):', outerSelect)
+}
+
+function timeout<T>(x: Promise<T>, ms: number = 800): Promise<T> {
+  return new Promise((resolve, reject) => {
+    const timer = setTimeout(() => {
+      reject(new Error('timeout'))
+    }, ms)
+    x.then(resolve, reject).finally(() => clearTimeout(timer))
+  })
+}

When run, this times out at the Promise.all – specifically, the outerSelectPromise seems to be timing out (i.e. somehow the

simple diagram of tested behavior because reading async code is hard

Screenshot 2024-11-14 at 4 02 26 PM

When a transaction is in-progress, any other requests (outside of the transaction) performed concurrently never resolve.

I have suspicions that also:

  1. Concurrent requests outside of the transaction end up being part of the transaction (I thought I saw this in a bigger project, but I'm not seeing it in the small repro – this does seem to happen with kysely-sqlite-builder if you look at the edit history of this issue)
  2. Concurrent requests can cause responses to get out of sync with requests in the WaSqliteWorkerDriver – this manifested in my bigger project as an INSERT INTO ... RETURNING complaining about no results, but when I traced the request / response, the response (1) arrived at the worker driver after the Kysely query execution completed, and (2) did include results – hypothesis is that the no results error was raised because the INSERT INTO request thought that some other query response contained the results to its query.

I'm going to continue trying to reproduce these issues in the playground, but the issue in the snippet above seems real enough 🤷

I haven't used any of the other packages in this repo, so I don't know if this is an issue in the other drivers.

I'm motivated to fix this for my own project, but I haven't tried fixing it yet – will PR if I'm able to fix it.

@davidisaaclee
Copy link
Author

davidisaaclee commented Nov 14, 2024

I didn't see that the playground uses kysely-sqlite-builder instead of Kysely directly – not sure what the difference in behavior is, so I'm working to update my example to use Kysely.

Updated snippet to use Kysely directly.

@davidisaaclee davidisaaclee changed the title dialect-wasqlite-worker transactions leak into concurrent requests dialect-wasqlite-worker transactions break concurrent requests Nov 15, 2024
@subframe7536
Copy link
Owner

Thanks for reporting and your detailed reproduction. I will look into this issue.

@subframe7536
Copy link
Owner

image

Seems that this issue is caused by the usage of ConnectionMutex. It ensures that all executions are serial and all parallel requests are blocked (follow the design of official sqlite driver).

@davidisaaclee
Copy link
Author

@subframe7536 Yes, that makes sense. Do you consider the current behavior to be expected, then?

When the driver receives a request to perform transaction A (which may be an implicit transaction, i.e. a standalone query), but is in the middle of transaction B, I would expect transaction A to be blocked while transaction B is in-progress, but once B completes, A should continue, and eventually resolve. (I find this stuff hard to talk about clearly, I can try to illustrate it better if this is hard to understand.)

@subframe7536
Copy link
Owner

subframe7536 commented Nov 15, 2024

I tested the reproduction with official sqlite dialect, and it also emit timeout error. Also, the proper way to execute sqls in transaction is reusing the tx that db.transaction().execute(tx => {}) gives.

And I'm curious about that you usecase to do this logic concurrently? Generally awaiting in transaction and everything works:

let outerSelect
await db.transaction().execute(async (tx) => {
  await tx.insertInto('test')
    .values({ name: 'from transaction' })
    .execute()
  outerSelect = await tx.selectFrom('test')
    .selectAll()
    .where('name', '=', 'from transaction')
    .execute()
  throw rollback
}).catch((err) => {
  // swallow rollback error
  if (err !== rollback) {
    throw err
  }
})

console.log(outSelect)

and, just use db.kysely to get the Kysely instance

@davidisaaclee
Copy link
Author

davidisaaclee commented Nov 15, 2024

Thanks @subframe7536 - to clarify, do you mean the better-sqlite3 dialect that ships with Kysely? https://kysely-org.github.io/kysely-apidoc/classes/SqliteDialect.html

My use case is that I'm processing data as it trickles in from multiple parallel API requests - as each API response arrives, I put its data into SQLite. Since I don't serialize the SQLite work from my end, I end up running stuff concurrently. (I think the current snippet in the issue doesn't demonstrate this well – the original snippet illustrated it better but I wasn't able to repro the issue using bare Kysely. This could definitely be my own code's issue!)

I'm curious to know exactly which dialect you're talking about and see if it fixes the real issue I'm seeing, but I'll close this issue in the meantime.

@subframe7536
Copy link
Owner

to clarify, do you mean the better-sqlite3 dialect that ships with Kysely?

Yes.

I think the operations of database and external APIs should be splitted. Also, Promise.all can be placed in db.transaction().execute(() => {}) to have a better abstraction.

@davidisaaclee
Copy link
Author

Hi @subframe7536 – I found the issue in my project, and just wanted to share my findings in case you were interested:

Here's a more succinct repro that better represents the real issue I was encountering:

diff --git a/playground/src/modules/utils.ts b/playground/src/modules/utils.ts
index d2b42cc..0d5be6b 100644
--- a/playground/src/modules/utils.ts
+++ b/playground/src/modules/utils.ts
@@ -1,4 +1,4 @@
-import type { Dialect } from 'kysely'
+import type { Dialect, Kysely } from 'kysely'
 import type { InferDatabase } from 'kysely-sqlite-builder/schema'
 import { SqliteBuilder } from 'kysely-sqlite-builder'
 import { column, defineTable, useSchema } from 'kysely-sqlite-builder/schema'
@@ -53,9 +53,30 @@ export async function testDB(dialect: Dialect) {
     console.error(error)
   }
 
+  await checkTxConcurrency(db.kysely)
+
   return db.selectFrom('test').selectAll().execute().then(async (data) => {
     await db.destroy()
     console.log(data)
     return data
   })
 }
+
+async function checkTxConcurrency(db: Kysely<DB>) {
+  await db.transaction().execute(async (tx) => {
+    // This will return no rows.
+    // This is necessary for this repro to fail: my understanding is that the
+    // subsequent INSERT INTO resolves on this query's response.
+    tx.selectFrom('test').selectAll().where('id', '=', 99999).execute()
+
+    // This should insert 1 row
+    tx.insertInto('test')
+      .values({
+        name: `test at ${Date.now()}`,
+        blobtest: Uint8Array.from([2, 3, 4, 5, 6, 7, 8]),
+      })
+      .returning('name')
+      // This raises a `no results` error
+      .executeTakeFirstOrThrow()
+  })
+}

When this is run, Kysely raises a no results error.

Note that the queries inside the transaction in checkTxConcurrency are not awaited – this was the key bug in my code. (The bug "goes away" if you await one or both of the inner queries – which I don't fully understand. Putting both into a Promise.all and awaiting that promise does not cause an error either.)

I don't see a practical reason for someone to exit the transaction block without awaiting the inner queries, but it does stink that a typo like this can cause a hard-to-debug error.

I think this issue happens when the worker receives a request, then receives another request before it is able to respond to the first. Since WaSqliteWorkerConnection assumes the next RunMsg is the response to the most-recent exec request, it assigns the response to the wrong request. If this is true, I think a nice DX thing would be to raise an error when WaSqliteWorkerConnection tries to send a request while still waiting for another request (I think this is an invalid state?). But I understand if you'd prefer to leave it as the responsibility of the application dev.

@subframe7536
Copy link
Owner

Yeah, in this case, the wasqlite-worker dialect performances a wrong behavior. The better-sqlite3 dialect works well.

I will try to fix it.

@subframe7536 subframe7536 reopened this Nov 17, 2024
@subframe7536
Copy link
Owner

It seems complicated to bind message with something like queryId, especially for the support of streamQuery...

@davidisaaclee
Copy link
Author

Just to be clear: This isn't blocking my work, so no time pressure from me.

Here's a really dumb, messy commit I made to my fork to help debug the issue which applies request IDs: d017539 the summary is that I generate a requestId from the driver and include it on requests, then return it from worker using a wrapper around postMessage. I'm guessing you tried this! but including it just in case it helps.

(I use a local worker that implements the same interface, since I wanted to add some features and wanted to use wa-sqlite directly to implement them – here's a messy, possibly broken commit of that worker: https://gist.github.com/davidisaaclee/f2b3af65f2cf742f0011b185748588b5 )

I wasn't familiar with streaming queries, so maybe there's something there that would make this approach fail (but reading the code, it looks like you could just "close over" the request ID in the async generator).

@subframe7536
Copy link
Owner

Thanks for your suggesion!

BTW there are some internal methods in wa-sqlite to get rowId and changes.

@subframe7536
Copy link
Owner

It would be better to handle this issue at user side, so I just close this issue

@subframe7536 subframe7536 closed this as not planned Won't fix, can't repro, duplicate, stale Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants