From f8d51554ae6808342328867febdcac9c5fe12885 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Thu, 10 Oct 2024 14:35:37 -0400 Subject: [PATCH] feat: enable tmsqlite+gcosmos It is hardcoded to use an on-disk database (at a temporary file location). We need to get some command-line flags in place to allow toggling between tmsqlite in-memory, tmsqlite on-disk, and the original tmmemstore implementations. This change also fixes outstanding flakiness in the tmsqlite in-memory store, which seemed to be due to allowing more than one concurrent write. Also expand the inline documentation, in case we ever need to touch any of the SQLite setup again in the future. --- gcosmos/gserver/component.go | 23 ++++++-------- tmsqlite/tmstore.go | 59 ++++++++++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/gcosmos/gserver/component.go b/gcosmos/gserver/component.go index 2dbd286..fe2c492 100644 --- a/gcosmos/gserver/component.go +++ b/gcosmos/gserver/component.go @@ -55,11 +55,10 @@ var ( _ serverv2.HasStartFlags = (*Component)(nil) ) -// We intend to migrate to using SQLite, -// but there are outstanding bugs preventing the tmsqlite storage from working correctly. -// This is a cheap compile-time setting to quickly test using tmsqlite -// or switch back to tmmemstore where the tests are still passing. -const useSQLite = false +// The tmsqlite integration is currently working, +// but we don't yet have command line switches to go back to the tmmemstore implementation, +// so just use this package-level constant for now. +const useSQLite = true // Component is a server component to be injected into the Cosmos SDK server module. type Component struct { @@ -222,15 +221,9 @@ func (c *Component) Init(app serverv2.AppI[transaction.Tx], cfg map[string]any, } if useSQLite { - // For now we are hardcoding this to use an in-memory database. - // The cache=shared parameter is required so that the Go SQL driver, - // which will occasionally create a new connection, - // uses the same in-memory database for each connection belonging to this name. - // Without cache=shared, new connections will be against - // an ephemeral, brand new in-memory database - // without any tables or data. - // TODO: we need to accept a command line flag to use an on-disk database. - const useInMem = true + // TODO: we need to accept a command line flag to choose + // between an in-memory or an on-disk database. + const useInMem = false if useInMem { c.tmsql, err = tmsqlite.NewInMemTMStore( c.rootCtx, @@ -241,6 +234,8 @@ func (c *Component) Init(app serverv2.AppI[transaction.Tx], cfg map[string]any, return fmt.Errorf("failed to start tmsqlite store: %w", err) } } else { + // Hardcoding to use a temporary validator database for now, + // but this should be a command line flag. f, err := os.CreateTemp("", "validator-*.sqlite") if err != nil { return fmt.Errorf("failed to create temp file: %w", err) diff --git a/tmsqlite/tmstore.go b/tmsqlite/tmstore.go index e483543..ce3414c 100644 --- a/tmsqlite/tmstore.go +++ b/tmsqlite/tmstore.go @@ -44,8 +44,11 @@ func NewOnDiskTMStore( // Create a file for the database; // if no file exists, then our startup pragma commands fail. if os.IsNotExist(err) { - // Create it. - f, err := os.Create(dbPath) + // The file did not exist so we need to create it. + // We don't use os.Create since that will truncate an existing file. + // While very unlikely that we would be racing to create a file, + // it is much better to be defensive about it. + f, err := os.OpenFile(dbPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o600) if err != nil { return nil, fmt.Errorf("failed to create empty database file: %w", err) } @@ -57,7 +60,14 @@ func NewOnDiskTMStore( } } - uri := "file:" + filepath.Clean(dbPath) + "?mode=rw" + // In contrast to the in-memory store, + // we only have to mark this connection mode as read-write. + // In combination with the SetMaxOpenConns(1) call, + // this allows only a single writer at a time; + // instead of other writers getting an ephemeral "table is locked" + // or "database is locked" error, they will simply block + // while contending for the single available connection. + uri := "file:" + dbPath + "?mode=rw" // The driver type comes from the sqlitedriver_*.go file // chosen based on build tags. @@ -107,21 +117,38 @@ func NewInMemTMStore( hashScheme tmconsensus.HashScheme, reg *gcrypto.Registry, ) (*TMStore, error) { + dbName := fmt.Sprintf("db%0000d", atomic.AddUint32(&inMemNameCounter, 1)) + uri := "file:" + dbName + + // Give the "file" a unique name so that multiple connections within one process + // can use the same in-memory database. + // Standard query parameter: https://www.sqlite.org/uri.html#recognized_query_parameters + "?mode=memory" + + // The cache can only be shared or private. + // A private cache means every connection would see a unique database, + // so this must be shared. + "&cache=shared" + + // Both SQLite wrappers support _txlock. + // Immediate effectively takes a write lock on the database + // at the beginning of every transaction. + // https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + "&_txlock=immediate" + // The driver type comes from the sqlitedriver_*.go file // chosen based on build tags. - dbName := fmt.Sprintf("db%0000d", atomic.AddUint32(&inMemNameCounter, 1)) - uri := "file:" + dbName + "?mode=memory&cache=shared&_txlock=immediate&_busytimeout=2000" rw, err := sql.Open(sqliteDriverType, uri) if err != nil { return nil, fmt.Errorf("error opening read-write database: %w", err) } - // This needs to be first. - // And unlike other pragmas, this is persistent, - // and it is only relevant to on-disk databases. - if _, err := rw.ExecContext(ctx, `PRAGMA journal_mode = WAL`); err != nil { - return nil, fmt.Errorf("failed to set journal_mode=WAL: %w", err) - } + // Without limiting it to one open connection, + // we would get frequent "table is locked" errors. + // These errors, as far as I can tell, + // do not automatically resolve with the busy timeout handler. + // So, only allow one active write connection to the database at a time. + rw.SetMaxOpenConns(1) + + // We don't set journal mode to WAL with the in-memory store, + // like we do at this point in the on-disk store. if err := pragmasRW(ctx, rw); err != nil { return nil, err @@ -132,8 +159,14 @@ func NewInMemTMStore( } // It would be nice if there was a way to mark this connection as read-only, - // but that does not appear possible with the drivers available. - uri = "file:" + dbName + "?mode=memory&cache=shared&_busytimeout=2000" + // but that does not appear possible with the drivers available + // (you have to connect to an on-disk database for that). + // We use an identical connection URI except for removing the txlock directive. + var ok bool + uri, ok = strings.CutSuffix(uri, "&_txlock=immediate") + if !ok { + panic(fmt.Errorf("BUG: failed to cut _txlock suffix from uri %q", uri)) + } ro, err := sql.Open(sqliteDriverType, uri) if err != nil { return nil, fmt.Errorf("error opening read-only database: %w", err)