Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a PostgreSQL based job server option to replace AMQP #229

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ our API server at https://api.dbhub.io, or run things locally for your own users

### Requirements

* [Golang](https://golang.org) - version 1.17 or above is required.
* [Golang](https://golang.org) - version 1.18 or above is required.
* [Memcached](https://memcached.org) - version 1.4.33 and above are known to work.
* [Minio](https://minio.io) - release 2016-11-26T02:23:47Z and later are known to work.
* [NodeJS](https://nodejs.org) - version 18.x is known to work, others are untested.
* [NodeJS](https://nodejs.org) - version 20 is known to work, others are untested.
* [PostgreSQL](https://www.postgresql.org) - version 13 and above are known to work.
* [RabbitMQ](https://www.rabbitmq.com) - version 3.10.x and above are known to work.
* [Yarn](https://classic.yarnpkg.com) - version 1.22.x. Not Yarn 2.x or greater.

### Subdirectories
Expand Down
60 changes: 19 additions & 41 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"log"
"mime/multipart"
Expand Down Expand Up @@ -132,11 +131,11 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var cols []sqlite.Column
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -187,7 +186,7 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the columns request to our AMQP backend
// Send the columns request to our job queue backend
cols, _, err = com.LiveColumns(liveNode, loggedInUser, dbOwner, dbName, table)
if err != nil {
jsonErr(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -383,7 +382,7 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) {
}
}

// For a live database, delete it from both Minio and our AMQP backend
// For a live database, delete it from both Minio and our job queue backend
var bucket, id string
if isLive {
// Get the Minio bucket and object names for this database
Expand All @@ -400,7 +399,7 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Delete the database from our AMQP backend
// Delete the database from our job queue backend
err = com.LiveDelete(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -700,11 +699,11 @@ func executeHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// Send the SQL execution request to our AMQP backend
// Send the SQL execution request to our job queue backend
var rowsChanged int
rowsChanged, err = com.LiveExecute(liveNode, loggedInUser, dbOwner, dbName, sql)
if err != nil {
Expand Down Expand Up @@ -752,11 +751,11 @@ func indexesHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var indexes []com.APIJSONIndex
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -811,33 +810,12 @@ func indexesHandler(w http.ResponseWriter, r *http.Request) {
indexes = append(indexes, oneIndex)
}
} else {
// Send the indexes request to our AMQP backend
var rawResponse []byte
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "")
// Send the indexes request to our job queue backend
indexes, err = com.LiveIndexes(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
return
}

// Decode the response
var resp com.LiveDBIndexesResponse
err = json.Unmarshal(rawResponse, &resp)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
return
}
if resp.Error != "" {
err = errors.New(resp.Error)
jsonErr(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.Node == "" {
log.Printf("In API (Live) indexesHandler(). A node responded, but didn't identify itself.")
return
}
indexes = resp.Indexes
}

// Return the results
Expand Down Expand Up @@ -955,7 +933,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

Expand Down Expand Up @@ -1088,11 +1066,11 @@ func tablesHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var tables []string
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -1125,7 +1103,7 @@ func tablesHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the tables request to our AMQP backend
// Send the tables request to our job queue backend
tables, err = com.LiveTables(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -1361,7 +1339,7 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("API Server: Username '%s' uploaded LIVE database '%s/%s', bytes: %v", loggedInUser,
com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes)

// Send a request to the AMQP backend to set up the database there, ready for querying
// Send a request to the job queue to set up the database
liveNode, err := com.LiveCreateDB(com.AmqpChan, dbOwner, dbName, objectID)
if err != nil {
log.Println(err)
Expand Down Expand Up @@ -1448,11 +1426,11 @@ func viewsHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var views []string
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -1485,7 +1463,7 @@ func viewsHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the views request to our AMQP backend
// Send the views request to our job queue backend
views, err = com.LiveViews(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down
30 changes: 20 additions & 10 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ func main() {
log.Fatalf("Configuration file problem: '%s'", err)
}

// Set the node name used in various logging strings
com.Conf.Live.Nodename = "API server"

// Open the request log for writing
reqLog, err = os.OpenFile(com.Conf.Api.RequestLog, os.O_CREATE|os.O_APPEND|os.O_WRONLY|os.O_SYNC, 0750)
if err != nil {
log.Fatalf("Error when opening request log: %s", err)
}
defer reqLog.Close()
log.Printf("Request log opened: %s", com.Conf.Api.RequestLog)
log.Printf("%s: request log opened: %s", com.Conf.Live.Nodename, com.Conf.Api.RequestLog)

// Parse our template files
tmpl = template.Must(template.New("templates").Delims("[[", "]]").ParseGlob(
Expand All @@ -70,9 +73,8 @@ func main() {
log.Fatal(err)
}

// Connect to MQ server
com.Conf.Live.Nodename = "API server"
com.AmqpChan, err = com.ConnectMQ()
// Connect to job queue server
com.AmqpChan, err = com.ConnectQueue()
if err != nil {
log.Fatal(err)
}
Expand All @@ -95,6 +97,15 @@ func main() {
log.Fatal(err)
}

// Start background goroutines to handle job queue responses
if !com.UseAMQP {
com.ResponseWaiters = com.NewResponseReceiver()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()
}

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -136,11 +147,10 @@ func main() {

// Load our self signed CA Cert chain, check client certificates if given, and set TLS1.2 as minimum
newTLSConfig := &tls.Config{
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: ourCAPool,
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
RootCAs: ourCAPool,
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: ourCAPool,
MinVersion: tls.VersionTLS12,
RootCAs: ourCAPool,
}
srv := &http.Server{
Addr: com.Conf.Api.BindAddress,
Expand All @@ -153,7 +163,7 @@ func main() {
server = fmt.Sprintf("https://%s", com.Conf.Api.ServerName)

// Start API server
log.Printf("API server starting on %s", server)
log.Printf("%s: listening on %s", com.Conf.Live.Nodename, server)
err = srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

// Shut down nicely
Expand Down
47 changes: 38 additions & 9 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import (
"crypto/tls"
"errors"
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"strconv"

"github.com/BurntSushi/toml"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/mitchellh/go-homedir"
)
Expand All @@ -19,26 +22,30 @@

// PostgreSQL configuration info
pgConfig *pgxpool.Config

// Configuration info for the PostgreSQL job queue
listenConfig *pgx.ConnConfig
)

// ReadConfig reads the server configuration file.
func ReadConfig() error {
func ReadConfig() (err error) {
// Override config file location via environment variables
var err error
configFile := os.Getenv("CONFIG_FILE")
if configFile == "" {
// TODO: Might be a good idea to add permission checks of the dir & conf file, to ensure they're not
// world readable. Similar in concept to what ssh does for its config files.
userHome, err := homedir.Dir()
var userHome string
userHome, err = homedir.Dir()
if err != nil {
log.Printf("User home directory couldn't be determined: '%s'", err)
return err
return
}
configFile = filepath.Join(userHome, ".dbhub", "config.toml")
}

// Reads the server configuration from disk
if _, err := toml.DecodeFile(configFile, &Conf); err != nil {
_, err = toml.DecodeFile(configFile, &Conf)
if err != nil {
return fmt.Errorf("Config file couldn't be parsed: %s", err)
}

Expand Down Expand Up @@ -166,14 +173,18 @@
}

// Check cache directory exists
if _, err := os.Stat(Conf.DiskCache.Directory); os.IsNotExist(err) {
_, err = os.Stat(Conf.DiskCache.Directory)
if errors.Is(err, fs.ErrNotExist) {
if os.MkdirAll(Conf.DiskCache.Directory, 0775) != nil {
log.Fatal(err)
return
}
}

// Set the PostgreSQL configuration values
// Set the main PostgreSQL database configuration values
pgConfig, err = pgxpool.ParseConfig(fmt.Sprintf("host=%s port=%d user= %s password = %s dbname=%s pool_max_conns=%d connect_timeout=10", Conf.Pg.Server, uint16(Conf.Pg.Port), Conf.Pg.Username, Conf.Pg.Password, Conf.Pg.Database, Conf.Pg.NumConnections))
if err != nil {
return
}
clientTLSConfig := tls.Config{}
if Conf.Environment.Environment == "production" {
clientTLSConfig.ServerName = Conf.Pg.Server
Expand All @@ -187,12 +198,30 @@
pgConfig.ConnConfig.TLSConfig = nil
}

// Create the connection string for the dedicated PostgreSQL notification connection
listenConfig, err = pgx.ParseConfig(fmt.Sprintf("host=%s port=%d user= %s password = %s dbname=%s connect_timeout=10", Conf.Pg.Server, uint16(Conf.Pg.Port), Conf.Pg.Username, Conf.Pg.Password, Conf.Pg.Database))
if err != nil {
return
}
listenTLSConfig := tls.Config{}
if Conf.Environment.Environment == "production" {
listenTLSConfig.ServerName = Conf.Pg.Server
listenTLSConfig.InsecureSkipVerify = false
} else {
listenTLSConfig.InsecureSkipVerify = true
justinclift marked this conversation as resolved.
Dismissed
Show resolved Hide resolved
}
if Conf.Pg.SSL {
listenConfig.TLSConfig = &listenTLSConfig
} else {
listenConfig.TLSConfig = nil
}

// Environment variable override for non-production logged-in user
tempString = os.Getenv("DBHUB_USERNAME")
if tempString != "" {
Conf.Environment.UserOverride = tempString
}

// The configuration file seems good
return nil
return
}
1 change: 0 additions & 1 deletion common/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type MinioInfo struct {
Server string
}

// MQInfo contains the AMQP backend connection configuration info
type MQInfo struct {
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
Expand Down
2 changes: 1 addition & 1 deletion common/cypress.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CypressSeed(w http.ResponseWriter, r *http.Request) {
return
}

// Send the live database file to our AMQP backend for setup
// Send the live database file to our job queue backend for setup
dbOwner := "default"
dbName := "Join Testing with index.sqlite"
liveNode, err := LiveCreateDB(AmqpChan, dbOwner, dbName, objectID)
Expand Down
Loading