Skip to content

Commit

Permalink
Add alternate mechanism for parsing log lines (#494)
Browse files Browse the repository at this point in the history
Instead of supporting a list of predefined `log_line_prefix`es, and testing
each log line against each prefix, check `log_line_prefix` at startup (and
watch for changes during full snapshots), and build a custom regex based
on the prefix. This is faster and more flexible, since it can support arbitrary
prefixes.

Add a `db_log_line_prefix` option for now to fall back to the old behavior
in case there are issues with the new mechanism. The old parsing code
and this option can be dropped in a future patch.
  • Loading branch information
msakrejda authored Aug 20, 2024
1 parent 1d1e4da commit 768c801
Show file tree
Hide file tree
Showing 18 changed files with 1,838 additions and 711 deletions.
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ type ServerConfig struct {
// function. Used by default for Crunchy Bridge.
LogPgReadFile bool `ini:"db_log_pg_read_file"`

// Configure the collector's log parsing mechanism. Can be either 'legacy' (to
// use the old mechanism), or 'auto' (a new mechanism that can support an
// arbitrary log_line_prefix).
// TODO: analyze and mention (hopefully beneficial) performance impact
LogLinePrefix string `init:"db_log_line_prefix"`

// Specifies a table pattern to ignore - no statistics will be collected for
// tables that match the name. This uses Golang's filepath.Match function for
// comparison, so you can e.g. use "*" for wildcard matching.
Expand Down
12 changes: 12 additions & 0 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func parseConfigDisableCitusSchemaStats(value string) string {
return "all"
}

func parseConfigLogLinePrefix(value string) string {
if value == "legacy" {
return value
}
return "auto"
}

func getDefaultConfig() *ServerConfig {
config := &ServerConfig{
APIBaseURL: DefaultAPIBaseURL,
Expand Down Expand Up @@ -301,6 +308,9 @@ func getDefaultConfig() *ServerConfig {
if logOtelK8SLabels := os.Getenv("LOG_OTEL_K8S_LABELS"); logOtelK8SLabels != "" {
config.LogOtelK8SLabels = logOtelK8SLabels
}
if logLinePrefix := os.Getenv("LOG_LINE_PREFIX"); logLinePrefix != "" {
config.LogLinePrefix = logLinePrefix
}
if alwaysCollectSystemData := os.Getenv("PGA_ALWAYS_COLLECT_SYSTEM_DATA"); alwaysCollectSystemData != "" {
config.AlwaysCollectSystemData = parseConfigBool(alwaysCollectSystemData)
}
Expand Down Expand Up @@ -709,6 +719,8 @@ func preprocessConfig(config *ServerConfig) (*ServerConfig, error) {
config.DisableCitusSchemaStats = parseConfigDisableCitusSchemaStats(config.DisableCitusSchemaStats)
}

config.LogLinePrefix = parseConfigLogLinePrefix(config.LogLinePrefix)

return config, nil
}

Expand Down
3 changes: 2 additions & 1 deletion input/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lib/pq"
"github.com/pganalyze/collector/input/postgres"
"github.com/pganalyze/collector/input/system"
"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/scheduler"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
Expand Down Expand Up @@ -155,7 +156,7 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
ps.System = system.GetSystemState(ctx, server, logger, globalCollectionOpts)
}

server.SetLogTimezone(ts.Settings)
logs.SyncLogParser(server, ts.Settings)

ps.CollectorStats = getCollectorStats()
ts.CollectorConfig = getCollectorConfig(server.Config)
Expand Down
59 changes: 31 additions & 28 deletions input/system/azure/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
Expand Down Expand Up @@ -229,17 +228,21 @@ func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectio
return nil
}

// Parses one Azure Event Hub log record into one or two log lines (main + DETAIL)
func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string, error) {
var azureDbServerName string

logLineContent := in.Properties.Message

func GetServerNameFromRecord(in AzurePostgresLogRecord) string {
if in.LogicalServerName == "" { // Flexible Server
// For Flexible Server, logical server name is not set, so instead determine it based on the resource ID
resourceParts := strings.Split(in.ResourceID, "/")
azureDbServerName = strings.ToLower(resourceParts[len(resourceParts)-1])
return strings.ToLower(resourceParts[len(resourceParts)-1])
} else { // Single Server
return in.LogicalServerName
}
}

// Parses one Azure Event Hub log record into one or two log lines (main + DETAIL)
func ParseRecordToLogLines(in AzurePostgresLogRecord, parser state.LogParser) ([]state.LogLine, error) {
logLineContent := in.Properties.Message

if in.LogicalServerName != "" { // Single Server
// Adjust Azure-modified log messages to be standard Postgres log messages
if strings.HasPrefix(logLineContent, "connection received:") {
logLineContent = connectionReceivedRegexp.ReplaceAllString(logLineContent, "$1")
Expand All @@ -253,13 +256,10 @@ func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string,
// Add prefix and error level, which are separated from the content on
// Single Server (but our parser expects them together)
logLineContent = fmt.Sprintf("%s%s: %s", in.Properties.Prefix, in.Properties.ErrorLevel, logLineContent)

azureDbServerName = in.LogicalServerName
}

logLine, ok := logs.ParseLogLineWithPrefix("", logLineContent, nil)
logLine, ok := parser.ParseLine(logLineContent)
if !ok {
return []state.LogLine{}, "", fmt.Errorf("Can't parse log line: \"%s\"", logLineContent)
return []state.LogLine{}, fmt.Errorf("Can't parse log line: \"%s\"", logLineContent)
}

logLines := []state.LogLine{logLine}
Expand All @@ -275,7 +275,7 @@ func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string,
logLines = append(logLines, detailLogLine)
}

return logLines, azureDbServerName, nil
return logLines, nil
}

func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*state.Server, in <-chan AzurePostgresLogRecord, out chan state.ParsedLogStreamItem, globalCollectionOpts state.CollectionOpts, logger *util.Logger) {
Expand All @@ -296,7 +296,21 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta
return
}

logLines, azureDbServerName, err := ParseRecordToLogLines(in)
azureDbServerName := GetServerNameFromRecord(in)
var server *state.Server
for _, s := range servers {
if azureDbServerName == s.Config.AzureDbServerName {
server = s
}
}
if server == nil {
if globalCollectionOpts.TestRun {
logger.PrintVerbose("Discarding log line because of unknown server (did you set the correct azure_db_server_name?): %s", azureDbServerName)
}
continue
}
parser := server.GetLogParser()
logLines, err := ParseRecordToLogLines(in, parser)
if err != nil {
logger.PrintError("%s", err)
continue
Expand All @@ -310,19 +324,8 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta
continue
}

foundServer := false
for _, server := range servers {
if azureDbServerName == server.Config.AzureDbServerName {
foundServer = true

for _, logLine := range logLines {
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
}
}

if !foundServer && globalCollectionOpts.TestRun {
logger.PrintVerbose("Discarding log line because of unknown server (did you set the correct azure_db_server_name?): %s", azureDbServerName)
for _, logLine := range logLines {
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
}
}
Expand Down
30 changes: 25 additions & 5 deletions input/system/azure/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/kylelemons/godebug/pretty"
"github.com/pganalyze/collector/input/system/azure"
"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
)
Expand Down Expand Up @@ -277,19 +278,24 @@ var parseRecordTests = []parseRecordTestpair{
}

func TestParseRecordToLogLines(t *testing.T) {
for _, pair := range parseRecordTests {
for i, pair := range parseRecordTests {
var prefix string
if i < 3 {
prefix = logs.LogPrefixCustom3
} else {
prefix = logs.LogPrefixAzure
}
parser := logs.NewLogParser(prefix, nil, false, false)

var record azure.AzurePostgresLogRecord
err := json.Unmarshal([]byte(pair.recordIn), &record)
if err != nil {
t.Errorf("For \"%v\": expected unmarshaling to succeed, but it failed: %s\n", pair.recordIn, err)
}
lines, serverName, err := azure.ParseRecordToLogLines(record)
lines, err := azure.ParseRecordToLogLines(record, parser)
if pair.errOut != err {
t.Errorf("For \"%v\": expected error to be %v, but was %v\n", pair.recordIn, pair.errOut, err)
}
if pair.serverNameOut != serverName {
t.Errorf("For \"%v\": expected server name to be %v, but was %v\n", pair.recordIn, pair.serverNameOut, serverName)
}

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand All @@ -298,3 +304,17 @@ func TestParseRecordToLogLines(t *testing.T) {
}
}
}

func TestGetServerNameFromRecord(t *testing.T) {
for _, pair := range parseRecordTests {
var record azure.AzurePostgresLogRecord
err := json.Unmarshal([]byte(pair.recordIn), &record)
if err != nil {
t.Errorf("For \"%v\": expected unmarshaling to succeed, but it failed: %s\n", pair.recordIn, err)
}
serverName := azure.GetServerNameFromRecord(record)
if pair.serverNameOut != serverName {
t.Errorf("For \"%v\": expected server name to be %v, but was %v\n", pair.recordIn, pair.serverNameOut, serverName)
}
}
}
39 changes: 26 additions & 13 deletions input/system/google_cloudsql/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"google.golang.org/api/option"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
)
Expand Down Expand Up @@ -187,32 +186,46 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta
return
}

var server *state.Server
var isAlloyDBCluster bool

for _, s := range servers {
if in.GcpProjectID == s.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == s.Config.GcpCloudSQLInstanceID {
server = s
}
if in.GcpProjectID == s.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == s.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == s.Config.GcpAlloyDBInstanceID {
server = s
isAlloyDBCluster = true
}
}

if server == nil {
continue
}

parser := server.GetLogParser()

// We ignore failures here since we want the per-backend stitching logic
// that runs later on (and any other parsing errors will just be ignored).
// Note that we need to restore the original trailing newlines since
// AnalyzeStreamInGroups expects them and they are not present in the GCP
// log stream.
logLine, _ := logs.ParseLogLineWithPrefix("", in.Content+"\n", nil)
logLine, _ := parser.ParseLine(in.Content + "\n")
logLine.OccurredAt = in.OccurredAt

// Ignore loglines which are outside our time window
if !logLine.OccurredAt.IsZero() && logLine.OccurredAt.Before(linesNewerThan) {
continue
}

for _, server := range servers {
if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID {
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
if in.GcpProjectID == server.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == server.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == server.Config.GcpAlloyDBInstanceID {
// AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix)
parts := regexp.MustCompile(`(?s)^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content))
if len(parts) == 2 {
logLine.Content = parts[1]
}
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
if isAlloyDBCluster {
// AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix)
parts := regexp.MustCompile(`(?s)^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content))
if len(parts) == 2 {
logLine.Content = parts[1]
}
}
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
}
}()
Expand Down
4 changes: 3 additions & 1 deletion input/system/heroku/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func processSystemMetrics(ctx context.Context, timestamp time.Time, content []by
}
}

var HerokuLogParser = logs.NewLogParser(logs.LogPrefixHeroku2, nil, false, true)

func logStreamItemToLogLine(ctx context.Context, item HttpSyslogMessage, servers []*state.Server, sourceToServer map[string]*state.Server, now time.Time, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (map[string]*state.Server, *state.LogLine, string) {
timestamp, err := time.Parse(time.RFC3339, item.HeaderTimestamp)
if err != nil {
Expand Down Expand Up @@ -156,7 +158,7 @@ func logStreamItemToLogLine(ctx context.Context, item HttpSyslogMessage, servers
logLineNumberChunk, _ := strconv.ParseInt(lineParts[3], 10, 32)
prefixedContent := lineParts[4]

logLine, _ := logs.ParseLogLineWithPrefix("", prefixedContent+"\n", nil)
logLine, _ := HerokuLogParser.ParseLine(prefixedContent + "\n")

sourceToServer = catchIdentifyServerLine(sourceName, logLine.Content, sourceToServer, servers)

Expand Down
5 changes: 2 additions & 3 deletions input/system/selfhosted/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/papertrail/go-tail/follower"
"github.com/pganalyze/collector/input/postgres"
"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
)
Expand Down Expand Up @@ -387,7 +386,6 @@ func setupDockerTail(ctx context.Context, containerName string, out chan<- SelfH

func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger, parsedLogStream chan state.ParsedLogStreamItem) chan<- SelfHostedLogStreamItem {
logStream := make(chan SelfHostedLogStreamItem)
tz := server.GetLogTimezone()

wg.Add(1)
go func() {
Expand All @@ -406,13 +404,14 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, server *state.
if !ok {
return
}
logParser := server.GetLogParser()

// We ignore failures here since we want the per-backend stitching logic
// that runs later on (and any other parsing errors will just be ignored)
// Note that we need to restore the original trailing newlines since
// AnalyzeStreamInGroups expects them and they are not present in the tail
// log stream.
logLine, _ := logs.ParseLogLineWithPrefix("", item.Line+"\n", tz)
logLine, _ := logParser.ParseLine(item.Line + "\n")

if logLine.OccurredAt.IsZero() && !item.OccurredAt.IsZero() {
logLine.OccurredAt = item.OccurredAt
Expand Down
11 changes: 5 additions & 6 deletions input/system/selfhosted/otel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

"github.com/pganalyze/collector/logs"
"github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
Expand All @@ -25,15 +24,15 @@ import (
// Other variants (e.g. csvlog, or plain messages in a K8s context) are currently
// not supported and will be ignored.

func logLineFromJsonlog(record *common.KeyValueList, tz *time.Location) (state.LogLine, *state.LogLine) {
func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) (state.LogLine, *state.LogLine) {
var logLine state.LogLine

// If a DETAIL line is set, we need to create an additional log line
detailLineContent := ""

for _, rv := range record.Values {
if rv.Key == "log_time" {
logLine.OccurredAt = logs.GetOccurredAt(rv.Value.GetStringValue(), tz, false)
logLine.OccurredAt = logParser.GetOccurredAt(rv.Value.GetStringValue())
}
if rv.Key == "user_name" {
logLine.Username = rv.Value.GetStringValue()
Expand Down Expand Up @@ -108,7 +107,7 @@ func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, p

func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger) error {
otelLogServer := server.Config.LogOtelServer
tz := server.GetLogTimezone()
logParser := server.GetLogParser()
go func() {
http.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
Expand Down Expand Up @@ -144,7 +143,7 @@ func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream ch
// TODO: Support other logger names (this is only tested with CNPG)
if logger == "postgres" {
// jsonlog wrapped in K8s context (via fluentbit)
logLine, detailLine := logLineFromJsonlog(record, tz)
logLine, detailLine := logLineFromJsonlog(record, logParser)
if skipDueToK8sFilter(kubernetes, server, prefixedLogger) {
continue
}
Expand All @@ -155,7 +154,7 @@ func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream ch
}
} else if logger == "" && hasErrorSeverity {
// simple jsonlog (Postgres jsonlog has error_severity key)
logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), tz)
logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), logParser)
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
if detailLine != nil {
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine}
Expand Down
Loading

0 comments on commit 768c801

Please sign in to comment.