Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lfittl committed Dec 10, 2024
1 parent c1aee3d commit 47fc20e
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 76 deletions.
174 changes: 115 additions & 59 deletions input/postgres/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/pganalyze/collector/util"
)

func RunExplain(ctx context.Context, server *state.Server, inputs []state.PostgresQuerySample, collectionOpts state.CollectionOpts, logger *util.Logger) (outputs []state.PostgresQuerySample) {
func RunExplainForSamples(ctx context.Context, server *state.Server, inputs []state.PostgresQuerySample, collectionOpts state.CollectionOpts, logger *util.Logger) (outputs []state.PostgresQuerySample) {
var samplesByDb = make(map[string]([]state.PostgresQuerySample))

skip := func(sample state.PostgresQuerySample) bool {
Expand Down Expand Up @@ -84,68 +84,19 @@ func RunExplain(ctx context.Context, server *state.Server, inputs []state.Postgr
func runDbExplain(ctx context.Context, db *sql.DB, inputs []state.PostgresQuerySample, useHelper bool) ([]state.PostgresQuerySample, error) {
var outputs []state.PostgresQuerySample
for _, sample := range inputs {
var isUtil []bool
// To be on the safe side never EXPLAIN a statement that can't be parsed,
// or multiple statements in one (leading to accidental execution)
isUtil, err := util.IsUtilityStmt(sample.Query)
if err == nil && len(isUtil) == 1 && !isUtil[0] {
var explainOutput []byte

isExplainable, explainJSON, explainError, err := runExplainOrExplainAnalyze(ctx, db, sample.Query, sample.Parameters, useHelper)
if err != nil {
return nil, err
}
if isExplainable {
sample.HasExplain = true
sample.ExplainSource = pganalyze_collector.QuerySample_STATEMENT_LOG_EXPLAIN_SOURCE
sample.ExplainFormat = pganalyze_collector.QuerySample_JSON_EXPLAIN_FORMAT

if useHelper {
err = db.QueryRowContext(ctx, QueryMarkerSQL+"SELECT pganalyze.explain($1, $2)", sample.Query, pq.Array(sample.Parameters)).Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return nil, err
}
sample.ExplainError = fmt.Sprintf("%s", err)
}
} else {
if len(sample.Parameters) > 0 {
_, err = db.ExecContext(ctx, QueryMarkerSQL+"PREPARE pganalyze_explain AS "+sample.Query)
if err != nil {
if ctx.Err() != nil {
return nil, err
}
sample.ExplainError = fmt.Sprintf("%s", err)
} else {
paramStr := getQuotedParamsStr(sample.Parameters)
err = db.QueryRowContext(ctx, QueryMarkerSQL+"EXPLAIN (VERBOSE, FORMAT JSON) EXECUTE pganalyze_explain("+paramStr+")").Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return nil, err
}
sample.ExplainError = fmt.Sprintf("%s", err)
}

_, err = db.ExecContext(ctx, QueryMarkerSQL+"DEALLOCATE pganalyze_explain")
if err != nil {
return nil, err
}
}
} else {
err = db.QueryRowContext(ctx, QueryMarkerSQL+"EXPLAIN (VERBOSE, FORMAT JSON) "+sample.Query).Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return nil, err
}
sample.ExplainError = fmt.Sprintf("%s", err)
}
}
if explainJSON != nil {
sample.ExplainOutputJSON = explainJSON
}

if len(explainOutput) > 0 {
var explainOutputJSON []state.ExplainPlanContainer
if err := json.Unmarshal(explainOutput, &explainOutputJSON); err != nil {
sample.ExplainError = fmt.Sprintf("%s", err)
} else if len(explainOutputJSON) != 1 {
sample.ExplainError = fmt.Sprintf("Unexpected plan size: %d (expected 1)", len(explainOutputJSON))
} else {
sample.ExplainOutputJSON = &explainOutputJSON[0]
}
if explainError != "" {
sample.ExplainError = explainError
}
}

Expand All @@ -164,6 +115,111 @@ func contains(strs []string, val string) bool {
return false
}

func RunExplainAnalyzeForQueryRun(ctx context.Context, db *sql.DB, query string, parameters []null.String, useHelper bool) (string, error, error) {
var err error
var firstErr error
var result string

// check for helper here (before calling runExplainOrExplainAnalyze)

sql := "BEGIN; EXPLAIN (ANALYZE, VERBOSE, BUFFERS, FORMAT JSON) " + query + "; ROLLBACK"
err = db.QueryRowContext(ctx, sql).Scan(&result)
firstErr = err

// Run EXPLAIN ANALYZE a second time to get a warm cache result
err = db.QueryRowContext(ctx, sql).Scan(&result)

// If the first run failed and the second run succeeded, run once more to get a warm cache result
if err == nil && firstErr != nil {
err = db.QueryRowContext(ctx, sql).Scan(&result)
}

// If it timed out, capture a non-ANALYZE EXPLAIN instead
if err != nil && strings.Contains(err.Error(), "statement timeout") {
sql = "BEGIN; EXPLAIN (VERBOSE, FORMAT JSON) " + query + "; ROLLBACK"
err = db.QueryRowContext(ctx, sql).Scan(&result)
}

// Run one time with EXPLAIN ANALYZE (could do this one without buffers and timing, since we won't use them)
// Run again with EXPLAIN ANALYZE + buffers and timing
// If either run times out we should instead capture a regular EXPLAIN plan
}

// The helper variants are:
// - have no helper
// - have explain helper
// - have explain analyze helper
// and the actions are:
// - do EXPLAIN
// - do EXPLAIN ANALYZE with timings and buffers
// - do EXPLAIN ANALYZE without timings and buffers

func runExplainOrExplainAnalyze(ctx context.Context, db *sql.DB, query string, parameters []null.String, analyze bool, helperFunction string) (isExplainable bool, explainJSON *state.ExplainPlanContainer, explainError string, _err error) {
var isUtil []bool
// To be on the safe side never EXPLAIN a statement that can't be parsed,
// or multiple statements in one (leading to accidental execution)
isUtil, err := util.IsUtilityStmt(query)
if err == nil && len(isUtil) == 1 && !isUtil[0] {
var explainOutput []byte
isExplainable = true

if useHelper {
err = db.QueryRowContext(ctx, QueryMarkerSQL+"SELECT pganalyze.explain($1, $2)", query, pq.Array(parameters)).Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return false, nil, "", err
}
explainError = fmt.Sprintf("%s", err)
}
} else {
if len(parameters) > 0 {
_, err = db.ExecContext(ctx, QueryMarkerSQL+"PREPARE pganalyze_explain AS "+query)
if err != nil {
if ctx.Err() != nil {
return false, nil, "", err
}
explainError = fmt.Sprintf("%s", err)
} else {
paramStr := getQuotedParamsStr(parameters)
err = db.QueryRowContext(ctx, QueryMarkerSQL+"EXPLAIN (VERBOSE, FORMAT JSON) EXECUTE pganalyze_explain("+paramStr+")").Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return false, nil, "", err
}
explainError = fmt.Sprintf("%s", err)
}

_, err = db.ExecContext(ctx, QueryMarkerSQL+"DEALLOCATE pganalyze_explain")
if err != nil {
return false, nil, "", err
}
}
} else {
err = db.QueryRowContext(ctx, QueryMarkerSQL+"EXPLAIN (VERBOSE, FORMAT JSON) "+query).Scan(&explainOutput)
if err != nil {
if ctx.Err() != nil {
return false, nil, "", err
}
explainError = fmt.Sprintf("%s", err)
}
}
}

if len(explainOutput) > 0 {
var explainOutputJSON []state.ExplainPlanContainer
if err := json.Unmarshal(explainOutput, &explainOutputJSON); err != nil {
explainError = fmt.Sprintf("%s", err)
} else if len(explainOutputJSON) != 1 {
explainError = fmt.Sprintf("Unexpected plan size: %d (expected 1)", len(explainOutputJSON))
} else {
explainJSON = &explainOutputJSON[0]
}
}
}

return
}

func getQuotedParamsStr(parameters []null.String) string {
params := []string{}
for i := 0; i < len(parameters); i++ {
Expand Down
2 changes: 1 addition & 1 deletion runner/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func processLogStream(ctx context.Context, server *state.Server, logLines []stat

func postprocessAndSendLogs(ctx context.Context, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, transientLogState state.TransientLogState, grant state.Grant) (err error) {
if server.Config.EnableLogExplain && len(transientLogState.QuerySamples) != 0 {
transientLogState.QuerySamples = postgres.RunExplain(ctx, server, transientLogState.QuerySamples, globalCollectionOpts, logger)
transientLogState.QuerySamples = postgres.RunExplainForSamples(ctx, server, transientLogState.QuerySamples, globalCollectionOpts, logger)
}

if server.Config.FilterQuerySample == "all" {
Expand Down
20 changes: 4 additions & 16 deletions runner/query_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/pganalyze/collector/input/postgres"
Expand Down Expand Up @@ -88,22 +87,11 @@ func run(ctx context.Context, server *state.Server, collectionOpts state.Collect
result := ""

if query.Type == pganalyze_collector.QueryRunType_EXPLAIN {
sql := "BEGIN; EXPLAIN (ANALYZE, VERBOSE, BUFFERS, FORMAT JSON) " + comment + query.QueryText + "; ROLLBACK"
err = db.QueryRowContext(ctx, sql).Scan(&result)
firstErr = err
sql := comment + query.QueryText

// Run EXPLAIN ANALYZE a second time to get a warm cache result
err = db.QueryRowContext(ctx, sql).Scan(&result)

// If the first run failed and the second run succeeded, run once more to get a warm cache result
if err == nil && firstErr != nil {
err = db.QueryRowContext(ctx, sql).Scan(&result)
}

// If it timed out, capture a non-ANALYZE EXPLAIN instead
if err != nil && strings.Contains(err.Error(), "statement timeout") {
sql = "BEGIN; EXPLAIN (VERBOSE, FORMAT JSON) " + comment + query.QueryText + "; ROLLBACK"
err = db.QueryRowContext(ctx, sql).Scan(&result)
result, err, firstExecutionErr := postgres.RunExplainAnalyzeForQueryRun(ctx, server, sql, globalCollectionOpts, logger)
if firstExecutionErr != nil {
firstErr = firstExecutionErr
}
} else {
err = errors.New("Unhandled query run type")
Expand Down

0 comments on commit 47fc20e

Please sign in to comment.