Skip to content

Commit

Permalink
Track Postgres buffer cache usage (#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlinsley authored Nov 19, 2024
1 parent bf809ca commit 867e346
Show file tree
Hide file tree
Showing 16 changed files with 1,156 additions and 926 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ type ServerConfig struct {
// once the server is promoted
SkipIfReplica bool `ini:"skip_if_replica"`

// The maximum shared_buffers size in gigabytes that the collector will monitor
// pg_buffercache. Defaults to 200 GB.
MaxBufferCacheMonitoringGB int `ini:"max_buffer_cache_monitoring_gb"`

// Configuration for PII filtering
FilterLogSecret string `ini:"filter_log_secret"` // none/all/credential/parsing_error/statement_text/statement_parameter/table_data/ops/unidentified (comma separated)
FilterQuerySample string `ini:"filter_query_sample"` // none/normalize/all (defaults to "none")
Expand Down
11 changes: 6 additions & 5 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func parseConfigDisableCitusSchemaStats(value string) string {

func getDefaultConfig() *ServerConfig {
config := &ServerConfig{
APIBaseURL: DefaultAPIBaseURL,
SectionName: "default",
QueryStatsInterval: 60,
MaxCollectorConnections: 10,
OtelServiceName: DefaultOtelServiceName,
APIBaseURL: DefaultAPIBaseURL,
SectionName: "default",
QueryStatsInterval: 60,
MaxCollectorConnections: 10,
MaxBufferCacheMonitoringGB: 200,
OtelServiceName: DefaultOtelServiceName,
}

// The environment variables are the default way to configure when running inside a Docker container.
Expand Down
1 change: 1 addition & 0 deletions input/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func getCollectorConfig(c config.ServerConfig) state.CollectorConfig {
QueryStatsInterval: int32(c.QueryStatsInterval),
MaxCollectorConnections: int32(c.MaxCollectorConnections),
SkipIfReplica: c.SkipIfReplica,
MaxBufferCacheMonitoringGB: int32(c.MaxBufferCacheMonitoringGB),
FilterLogSecret: c.FilterLogSecret,
FilterQuerySample: c.FilterQuerySample,
FilterQueryText: c.FilterQueryText,
Expand Down
11 changes: 11 additions & 0 deletions input/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
return
}

bufferCacheReady := make(chan state.BufferCache)
go func() {
postgres.GetBufferCache(ctx, server, globalCollectionOpts, logger, ts.Version, bufferCacheReady)
}()

ps.LastStatementStatsAt = time.Now()
err = postgres.SetQueryTextStatementTimeout(ctx, connection, logger, server)
if err != nil {
Expand Down Expand Up @@ -137,6 +142,12 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
return
}

// CollectAllSchemas relies on GetBufferCache to access the filenode OIDs before that data is discarded
select {
case <-ctx.Done():
case ts.BufferCache = <-bufferCacheReady:
}

ps, ts, err = postgres.CollectAllSchemas(ctx, server, globalCollectionOpts, logger, ps, ts)
if err != nil {
logger.PrintError("Error collecting schema information: %s", err)
Expand Down
95 changes: 95 additions & 0 deletions input/postgres/buffer_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package postgres

import (
"context"
"fmt"
"time"

"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
)

const bufferCacheExtensionSQL string = `
SELECT COALESCE((
SELECT nspname
FROM pg_catalog.pg_extension
INNER JOIN pg_catalog.pg_namespace n ON extnamespace = n.oid
WHERE extname = 'pg_buffercache'
), '')
`

const bufferCacheSizeSQL string = `
SELECT pg_catalog.pg_size_bytes(unit) * pg_catalog.pg_size_bytes(setting) / 1024 / 1024 / 1024
FROM pg_catalog.pg_settings
WHERE name = 'shared_buffers'
`

// https://www.postgresql.org/docs/current/pgbuffercache.html
const bufferCacheSQL string = `
SELECT reldatabase, relfilenode, count(*) * current_setting('block_size')::int
FROM %s.pg_buffercache
GROUP BY 1, 2
`

func GetBufferCache(ctx context.Context, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, postgresVersion state.PostgresVersion, channel chan state.BufferCache) {
start := time.Now()
bufferCache := make(state.BufferCache)
db, err := EstablishConnection(ctx, server, logger, globalCollectionOpts, "")
if err != nil {
logger.PrintError("GetBufferCache: %s", err)
channel <- bufferCache
return
}

schemaName := ""
db.QueryRowContext(ctx, QueryMarkerSQL+bufferCacheExtensionSQL).Scan(&schemaName)
if schemaName == "" {
channel <- bufferCache
return
}

sizeGB := 0
db.QueryRowContext(ctx, QueryMarkerSQL+bufferCacheSizeSQL).Scan(&sizeGB)
if sizeGB > server.Config.MaxBufferCacheMonitoringGB {
if globalCollectionOpts.TestRun {
logger.PrintWarning("GetBufferCache: skipping collection. To enable, set max_buffer_cache_monitoring_gb to a value over %d", sizeGB)
}
channel <- bufferCache
return
}

stmt, err := db.PrepareContext(ctx, QueryMarkerSQL+fmt.Sprintf(bufferCacheSQL, schemaName))
if err != nil {
logger.PrintError("GetBufferCache: %s", err)
channel <- bufferCache
return
}
defer stmt.Close()
rows, err := stmt.QueryContext(ctx)
if err != nil {
logger.PrintError("GetBufferCache: %s", err)
channel <- bufferCache
return
}
defer rows.Close()
for rows.Next() {
var reldatabase state.Oid
var relfilenode state.Oid
var bytes int64
err = rows.Scan(&reldatabase, &relfilenode, &bytes)
if err != nil {
logger.PrintError("GetBufferCache: %s", err)
channel <- bufferCache
return
}
db, ok := bufferCache[reldatabase]
if ok {
db[relfilenode] = bytes
} else {
bufferCache[reldatabase] = map[state.Oid]int64{relfilenode: bytes}
}
}

logger.PrintVerbose("GetBufferCache: finished after %s", time.Since(start))
channel <- bufferCache
}
35 changes: 29 additions & 6 deletions input/postgres/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ const relationsSQL string = `
(SELECT p.partattrs FROM pg_partitioned_table p WHERE p.partrelid = c.oid) AS partition_columns,
COALESCE(pg_catalog.pg_get_partkeydef(c.oid), '') AS partition_expr,
locked_relids.relid IS NOT NULL AS exclusively_locked,
COALESCE(toast.relname, '') AS toast_table
COALESCE(toast.relname, '') AS toast_table,
COALESCE(pg_relation_filenode(c.oid), 0) AS data_filenode,
COALESCE(pg_relation_filenode(c.reltoastrelid), 0) AS toast_filenode
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON (n.oid = c.relnamespace)
LEFT JOIN locked_relids ON (c.oid = locked_relids.relid)
Expand Down Expand Up @@ -96,7 +98,8 @@ SELECT c.oid,
pg_catalog.pg_get_constraintdef(con.oid, FALSE),
c2.reloptions,
(SELECT a.amname FROM pg_catalog.pg_am a JOIN pg_catalog.pg_opclass o ON (a.oid = o.opcmethod) WHERE o.oid = i.indclass[0]),
false AS exclusively_locked
false AS exclusively_locked,
COALESCE(pg_relation_filenode(i.indexrelid), 0)
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON (n.oid = c.relnamespace)
JOIN pg_catalog.pg_index i ON (c.oid = i.indrelid)
Expand All @@ -123,7 +126,8 @@ SELECT c.oid,
NULL,
NULL,
'',
true AS exclusively_locked
true,
0
FROM locked_relids
`

Expand Down Expand Up @@ -184,7 +188,7 @@ SELECT relid,
FROM locked_relids
`

func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.PostgresVersion, currentDatabaseOid state.Oid, ignoreRegexp string) ([]state.PostgresRelation, error) {
func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.PostgresVersion, currentDatabaseOid state.Oid, ignoreRegexp string, ts state.TransientState) ([]state.PostgresRelation, error) {
relations := make(map[state.Oid]state.PostgresRelation, 0)

var systemCatalogFilter string
Expand Down Expand Up @@ -214,12 +218,14 @@ func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.Postgre
var row state.PostgresRelation
var options null.String
var partCols null.String
var dataFilenode state.Oid
var toastFilenode state.Oid

err = rows.Scan(&row.Oid, &row.SchemaName, &row.RelationName, &row.RelationType,
&options, &row.HasOids, &row.PersistenceType, &row.HasInheritanceChildren,
&row.HasToast, &row.FrozenXID, &row.MinimumMultixactXID, &row.ParentTableOid,
&row.PartitionBoundary, &row.PartitionStrategy, &partCols, &row.PartitionedBy,
&row.ExclusivelyLocked, &row.ToastName)
&row.ExclusivelyLocked, &row.ToastName, &dataFilenode, &toastFilenode)
if err != nil {
err = fmt.Errorf("Relations/Scan: %s", err)
return nil, err
Expand All @@ -242,6 +248,15 @@ func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.Postgre

row.DatabaseOid = currentDatabaseOid

bufferCache, ok := ts.BufferCache[currentDatabaseOid]
if ok {
row.CachedDataBytes = bufferCache[dataFilenode]
row.CachedToastBytes = bufferCache[toastFilenode]
// Any non-zero values are later summed up in DatabaseStatistic.UntrackedCacheBytes
bufferCache[dataFilenode] = 0
bufferCache[toastFilenode] = 0
}

relations[row.Oid] = row
}

Expand Down Expand Up @@ -303,10 +318,11 @@ func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.Postgre
var columns string
var options null.String
var exclusivelyLocked bool
var filenode state.Oid

err = rows.Scan(&row.RelationOid, &row.IndexOid, &columns, &row.Name, &row.IsPrimary,
&row.IsUnique, &row.IsValid, &row.IndexDef, &row.ConstraintDef, &options, &row.IndexType,
&exclusivelyLocked)
&exclusivelyLocked, &filenode)
if err != nil {
err = fmt.Errorf("Indices/Scan: %s", err)
return nil, err
Expand All @@ -325,6 +341,13 @@ func GetRelations(ctx context.Context, db *sql.DB, postgresVersion state.Postgre
}
}

bufferCache, ok := ts.BufferCache[currentDatabaseOid]
if ok {
row.CachedBytes = bufferCache[filenode]
// Any non-zero values are later summed up in DatabaseStatistic.UntrackedCacheBytes
bufferCache[filenode] = 0
}

relation, ok := relations[row.RelationOid]
if !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func collectOneSchema(ctx context.Context, server *state.Server, collectionOpts

func collectSchemaData(ctx context.Context, collectionOpts state.CollectionOpts, logger *util.Logger, db *sql.DB, ps state.PersistedState, ts state.TransientState, databaseOid state.Oid, postgresVersion state.PostgresVersion, server *state.Server, systemType string, dbName string) (state.PersistedState, state.TransientState, error) {
if collectionOpts.CollectPostgresRelations {
newRelations, err := GetRelations(ctx, db, postgresVersion, databaseOid, server.Config.IgnoreSchemaRegexp)
newRelations, err := GetRelations(ctx, db, postgresVersion, databaseOid, server.Config.IgnoreSchemaRegexp, ts)
if err != nil {
return ps, ts, fmt.Errorf("error collecting table/index metadata: %s", err)
}
Expand Down
Loading

0 comments on commit 867e346

Please sign in to comment.