Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Discovery timeouts, emergency read operations throttled by discovery timeouts #1351

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package db

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -256,15 +257,15 @@ func versionIsDeployed(db *sql.DB) (result bool, err error) {
}

// registerOrchestratorDeployment updates the orchestrator_metadata table upon successful deployment
func registerOrchestratorDeployment(db *sql.DB) error {
func registerOrchestratorDeployment(ctx context.Context, db *sql.DB) error {
query := `
replace into orchestrator_db_deployments (
deployed_version, deployed_timestamp
) values (
?, NOW()
)
`
if _, err := execInternal(db, query, config.RuntimeCLIFlags.ConfiguredVersion); err != nil {
if _, err := execInternal(ctx, db, query, config.RuntimeCLIFlags.ConfiguredVersion); err != nil {
log.Fatalf("Unable to write to orchestrator_metadata: %+v", err)
}
log.Debugf("Migrated database schema to version [%+v]", config.RuntimeCLIFlags.ConfiguredVersion)
Expand All @@ -273,7 +274,7 @@ func registerOrchestratorDeployment(db *sql.DB) error {

// deployStatements will issue given sql queries that are not already known to be deployed.
// This iterates both lists (to-run and already-deployed) and also verifies no contraditions.
func deployStatements(db *sql.DB, queries []string) error {
func deployStatements(ctx context.Context, db *sql.DB, queries []string) error {
tx, err := db.Begin()
if err != nil {
log.Fatale(err)
Expand Down Expand Up @@ -335,6 +336,7 @@ func deployStatements(db *sql.DB, queries []string) error {
// initOrchestratorDB attempts to create/upgrade the orchestrator backend database. It is created once in the
// application's lifetime.
func initOrchestratorDB(db *sql.DB) error {
ctx := context.Background()
log.Debug("Initializing orchestrator")

versionAlreadyDeployed, err := versionIsDeployed(db)
Expand All @@ -346,9 +348,9 @@ func initOrchestratorDB(db *sql.DB) error {
log.Fatalf("PanicIfDifferentDatabaseDeploy is set. Configured version %s is not the version found in the database", config.RuntimeCLIFlags.ConfiguredVersion)
}
log.Debugf("Migrating database schema")
deployStatements(db, generateSQLBase)
deployStatements(db, generateSQLPatches)
registerOrchestratorDeployment(db)
deployStatements(ctx, db, generateSQLBase)
deployStatements(ctx, db, generateSQLPatches)
registerOrchestratorDeployment(ctx, db)

if IsSQLite() {
ExecOrchestrator(`PRAGMA journal_mode = WAL`)
Expand All @@ -359,13 +361,13 @@ func initOrchestratorDB(db *sql.DB) error {
}

// execInternal
func execInternal(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
func execInternal(ctx context.Context, db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
var err error
query, err = translateStatement(query)
if err != nil {
return nil, err
}
res, err := sqlutils.ExecNoPrepare(db, query, args...)
res, err := sqlutils.ExecNoPrepare(ctx, db, query, args...)
return res, err
}

Expand All @@ -380,7 +382,8 @@ func ExecOrchestrator(query string, args ...interface{}) (sql.Result, error) {
if err != nil {
return nil, err
}
res, err := sqlutils.ExecNoPrepare(db, query, args...)
ctx := context.Background()
res, err := sqlutils.ExecNoPrepare(ctx, db, query, args...)
return res, err
}

Expand Down
8 changes: 7 additions & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (this *HttpAPI) Instance(params martini.Params, r render.Render, req *http.
// AsyncDiscover issues an asynchronous read on an instance. This is
// useful for bulk loads of a new set of instances and will not block
// if the instance is slow to respond or not reachable.
// It will also not block the raft queue in the event communication to discover instance hangs.
func (this *HttpAPI) AsyncDiscover(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
Expand All @@ -218,7 +219,12 @@ func (this *HttpAPI) AsyncDiscover(params martini.Params, r render.Render, req *
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
go this.Discover(params, r, req, user)

if orcraft.IsRaftEnabled() {
orcraft.PublishCommand("async-discover", instanceKey)
} else {
go logic.DiscoverInstance(instanceKey)
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Asynchronous discovery initiated for Instance: %+v", instanceKey)})
}
Expand Down
67 changes: 38 additions & 29 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package inst

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
"regexp"
"runtime"
"sort"
Expand All @@ -30,6 +30,8 @@ import (
"sync"
"time"

"github.com/go-sql-driver/mysql"

"github.com/openark/golib/log"
"github.com/openark/golib/math"
"github.com/openark/golib/sqlutils"
Expand Down Expand Up @@ -231,13 +233,13 @@ func unrecoverableError(err error) bool {

// Check if the instance is a MaxScale binlog server (a proxy not a real
// MySQL server) and also update the resolved hostname
func (instance *Instance) checkMaxScale(db *sql.DB, latency *stopwatch.NamedStopwatch) (isMaxScale bool, resolvedHostname string, err error) {
func (instance *Instance) checkMaxScale(ctx context.Context, db *sql.DB, latency *stopwatch.NamedStopwatch) (isMaxScale bool, resolvedHostname string, err error) {
if config.Config.SkipMaxScaleCheck {
return isMaxScale, resolvedHostname, err
}

latency.Start("instance")
err = sqlutils.QueryRowsMap(db, "show variables like 'maxscale%'", func(m sqlutils.RowMap) error {
err = sqlutils.QueryRowsMapContext(ctx, db, "show variables like 'maxscale%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "MAXSCALE_VERSION" {
originalVersion := m.GetString("Value")
if originalVersion == "" {
Expand Down Expand Up @@ -311,6 +313,9 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
}
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Config.MySQLDiscoveryReadTimeoutSeconds)*time.Second)
defer cancel()

var waitGroup sync.WaitGroup
var serverUuidWaitGroup sync.WaitGroup
readingStartTime := time.Now()
Expand Down Expand Up @@ -345,10 +350,14 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
if err != nil {
goto Cleanup
}
if db.Stats().InUse >= config.MySQLTopologyMaxPoolConnections {
err = fmt.Errorf("Database connections exhausted for %v", *instanceKey)
goto Cleanup
}

instance.Key = *instanceKey

if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil {
if isMaxScale, resolvedHostname, err = instance.checkMaxScale(ctx, db, latency); err != nil {
// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.session_variables.
// There is a wrong decision making in this design and the migration path to 5.7 will be difficult.
Expand All @@ -369,17 +378,17 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,

// Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted.
// Therefore we (currently) take @@hostname (which is masquerading as master host anyhow)
err = db.QueryRow("select @@hostname").Scan(&maxScaleMasterHostname)
err = db.QueryRowContext(ctx, "select @@hostname").Scan(&maxScaleMasterHostname)
if err != nil {
goto Cleanup
}
}
if isMaxScale110 {
// Only this is supported:
db.QueryRow("select @@server_id").Scan(&instance.ServerID)
db.QueryRowContext(ctx, "select @@server_id").Scan(&instance.ServerID)
} else {
db.QueryRow("select @@global.server_id").Scan(&instance.ServerID)
db.QueryRow("select @@global.server_uuid").Scan(&instance.ServerUUID)
db.QueryRowContext(ctx, "select @@global.server_id").Scan(&instance.ServerID)
db.QueryRowContext(ctx, "select @@global.server_uuid").Scan(&instance.ServerUUID)
}
} else {
// NOT MaxScale
Expand All @@ -391,7 +400,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
defer waitGroup.Done()
var dummy string
// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
err := db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)
err := db.QueryRowContext(ctx, "show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)

if err != nil {
logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)
Expand All @@ -408,7 +417,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
}

var mysqlHostname, mysqlReportHost string
err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan(
err = db.QueryRowContext(ctx, "select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan(
&mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled)
if err != nil {
goto Cleanup
Expand All @@ -433,7 +442,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error {
err := sqlutils.QueryRowsMapContext(ctx, db, "show master status", func(m sqlutils.RowMap) error {
var err error
instance.SelfBinlogCoordinates.LogFile = m.GetString("File")
instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position")
Expand All @@ -449,7 +458,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
defer waitGroup.Done()
semiSyncMasterPluginLoaded := false
semiSyncReplicaPluginLoaded := false
err := sqlutils.QueryRowsMap(db, "show global variables like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
err := sqlutils.QueryRowsMapContext(ctx, db, "show global variables like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "rpl_semi_sync_master_enabled" {
instance.SemiSyncMasterEnabled = (m.GetString("Value") == "ON")
semiSyncMasterPluginLoaded = true
Expand All @@ -471,7 +480,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, "show global status like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
err := sqlutils.QueryRowsMapContext(ctx, db, "show global status like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "Rpl_semi_sync_master_status" {
instance.SemiSyncMasterStatus = (m.GetString("Value") == "ON")
} else if m.GetString("Variable_name") == "Rpl_semi_sync_master_clients" {
Expand All @@ -496,14 +505,14 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// ...
// @@gtid_mode only available in Orcale MySQL >= 5.6
// Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't.
_ = db.QueryRow("select @@global.gtid_mode, @@global.server_uuid, @@global.gtid_executed, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.GTIDMode, &instance.ServerUUID, &instance.ExecutedGtidSet, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage)
_ = db.QueryRowContext(ctx, "select @@global.gtid_mode, @@global.server_uuid, @@global.gtid_executed, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.GTIDMode, &instance.ServerUUID, &instance.ExecutedGtidSet, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage)
if instance.GTIDMode != "" && instance.GTIDMode != "OFF" {
instance.SupportsOracleGTID = true
}
if config.Config.ReplicationCredentialsQuery != "" {
instance.ReplicationCredentialsAvailable = true
} else if masterInfoRepositoryOnTable {
_ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable)
_ = db.QueryRowContext(ctx, "select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable)
}
}()
}
Expand Down Expand Up @@ -549,7 +558,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,

instance.ReplicationIOThreadState = ReplicationThreadStateNoThread
instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread
err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error {
err = sqlutils.QueryRowsMapContext(ctx, db, "show slave status", func(m sqlutils.RowMap) error {
instance.HasReplicationCredentials = (m.GetString("Master_User") != "")
instance.ReplicationIOThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_IO_Running"))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_SQL_Running"))
Expand Down Expand Up @@ -627,7 +636,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
if err := db.QueryRow(config.Config.ReplicationLagQuery).Scan(&instance.ReplicationLagSeconds); err == nil {
if err := db.QueryRowContext(ctx, config.Config.ReplicationLagQuery).Scan(&instance.ReplicationLagSeconds); err == nil {
if instance.ReplicationLagSeconds.Valid && instance.ReplicationLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.ReplicationLagSeconds.Int64)
instance.ReplicationLagSeconds.Int64 = 0
Expand All @@ -649,7 +658,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// Get replicas, either by SHOW SLAVE HOSTS or via PROCESSLIST
// MaxScale does not support PROCESSLIST, so SHOW SLAVE HOSTS is the only option
if config.Config.DiscoverByShowSlaveHosts || isMaxScale {
err := sqlutils.QueryRowsMap(db, `show slave hosts`,
err := sqlutils.QueryRowsMapContext(ctx, db, `show slave hosts`,
func(m sqlutils.RowMap) error {
// MaxScale 1.1 may trigger an error with this command, but
// also we may see issues if anything on the MySQL server locks up.
Expand Down Expand Up @@ -685,7 +694,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
err := sqlutils.QueryRowsMapContext(ctx, db, `
select
substring_index(host, ':', 1) as slave_hostname
from
Expand Down Expand Up @@ -714,7 +723,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
err := sqlutils.QueryRowsMapContext(ctx, db, `
select
substring(service_URI,9) mysql_host
from
Expand All @@ -740,7 +749,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter)
err := db.QueryRowContext(ctx, config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter)
logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err)
}()
}
Expand All @@ -749,7 +758,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectRegionQuery).Scan(&instance.Region)
err := db.QueryRowContext(ctx, config.Config.DetectRegionQuery).Scan(&instance.Region)
logReadTopologyInstanceError(instanceKey, "DetectRegionQuery", err)
}()
}
Expand All @@ -758,7 +767,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment)
err := db.QueryRowContext(ctx, config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment)
logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err)
}()
}
Expand All @@ -767,7 +776,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias)
err := db.QueryRowContext(ctx, config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias)
logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err)
}()
}
Expand All @@ -776,7 +785,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced)
err := db.QueryRowContext(ctx, config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced)
logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err)
}()
}
Expand Down Expand Up @@ -830,7 +839,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
go func() {
defer waitGroup.Done()
var value string
err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value)
err := db.QueryRowContext(ctx, config.Config.DetectPromotionRuleQuery).Scan(&value)
logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err)
promotionRule, err := ParseCandidatePromotionRule(value)
logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err)
Expand All @@ -851,7 +860,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// Only need to do on masters
if config.Config.DetectClusterAliasQuery != "" {
clusterAlias := ""
if err := db.QueryRow(config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil {
if err := db.QueryRowContext(ctx, config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil {
logReadTopologyInstanceError(instanceKey, "DetectClusterAliasQuery", err)
} else {
instance.SuggestedClusterAlias = clusterAlias
Expand All @@ -869,7 +878,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
if instance.ReplicationDepth == 0 && config.Config.DetectClusterDomainQuery != "" && !isMaxScale {
// Only need to do on masters
domainName := ""
if err := db.QueryRow(config.Config.DetectClusterDomainQuery).Scan(&domainName); err != nil {
if err := db.QueryRowContext(ctx, config.Config.DetectClusterDomainQuery).Scan(&domainName); err != nil {
domainName = ""
logReadTopologyInstanceError(instanceKey, "DetectClusterDomainQuery", err)
}
Expand Down Expand Up @@ -929,7 +938,7 @@ Cleanup:
redactedMasterExecutedGtidSet, _ := NewOracleGtidSet(instance.masterExecutedGtidSet)
redactedMasterExecutedGtidSet.RemoveUUID(instance.MasterUUID)

db.QueryRow("select gtid_subtract(?, ?)", redactedExecutedGtidSet.String(), redactedMasterExecutedGtidSet.String()).Scan(&instance.GtidErrant)
db.QueryRowContext(ctx, "select gtid_subtract(?, ?)", redactedExecutedGtidSet.String(), redactedMasterExecutedGtidSet.String()).Scan(&instance.GtidErrant)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func ExecInstance(instanceKey *InstanceKey, query string, args ...interface{}) (
if err != nil {
return nil, err
}
return sqlutils.ExecNoPrepare(db, query, args...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Config.MySQLTopologyReadTimeoutSeconds)*time.Second)
defer cancel()
return sqlutils.ExecNoPrepare(ctx, db, query, args...)
}

// ExecuteOnTopology will execute given function while maintaining concurrency limit
Expand Down
Loading