From 768c8019ab80d9596326b8e976b7371c7f91fc45 Mon Sep 17 00:00:00 2001 From: Maciek Sakrejda Date: Tue, 20 Aug 2024 08:57:18 -0700 Subject: [PATCH] Add alternate mechanism for parsing log lines (#494) Instead of supporting a list of predefined `log_line_prefix`es, and testing each log line against each prefix, check `log_line_prefix` at startup (and watch for changes during full snapshots), and build a custom regex based on the prefix. This is faster and more flexible, since it can support arbitrary prefixes. Add a `db_log_line_prefix` option for now to fall back to the old behavior in case there are issues with the new mechanism. The old parsing code and this option can be dropped in a future patch. --- config/config.go | 6 + config/read.go | 12 + input/full.go | 3 +- input/system/azure/logs.go | 59 +- input/system/azure/logs_test.go | 30 +- input/system/google_cloudsql/logs.go | 39 +- input/system/heroku/logs.go | 4 +- input/system/selfhosted/logs.go | 5 +- input/system/selfhosted/otel_handler.go | 11 +- input/system/tembo/logs.go | 9 +- logs/legacy_parse.go | 634 ++++++++++++++++++ logs/legacy_parse_test.go | 619 +++++++++++++++++ logs/parse.go | 849 +++++++++--------------- logs/parse_test.go | 200 +++--- logs/replace_test.go | 3 +- main.go | 8 +- state/state.go | 37 +- state/util.go | 21 - 18 files changed, 1838 insertions(+), 711 deletions(-) create mode 100644 logs/legacy_parse.go create mode 100644 logs/legacy_parse_test.go diff --git a/config/config.go b/config/config.go index dc9d5e177..dfdcf5b5e 100644 --- a/config/config.go +++ b/config/config.go @@ -172,6 +172,12 @@ type ServerConfig struct { // function. Used by default for Crunchy Bridge. LogPgReadFile bool `ini:"db_log_pg_read_file"` + // Configure the collector's log parsing mechanism. Can be either 'legacy' (to + // use the old mechanism), or 'auto' (a new mechanism that can support an + // arbitrary log_line_prefix). + // TODO: analyze and mention (hopefully beneficial) performance impact + LogLinePrefix string `init:"db_log_line_prefix"` + // Specifies a table pattern to ignore - no statistics will be collected for // tables that match the name. This uses Golang's filepath.Match function for // comparison, so you can e.g. use "*" for wildcard matching. diff --git a/config/read.go b/config/read.go index cf36b9d16..6fff1420b 100644 --- a/config/read.go +++ b/config/read.go @@ -56,6 +56,13 @@ func parseConfigDisableCitusSchemaStats(value string) string { return "all" } +func parseConfigLogLinePrefix(value string) string { + if value == "legacy" { + return value + } + return "auto" +} + func getDefaultConfig() *ServerConfig { config := &ServerConfig{ APIBaseURL: DefaultAPIBaseURL, @@ -301,6 +308,9 @@ func getDefaultConfig() *ServerConfig { if logOtelK8SLabels := os.Getenv("LOG_OTEL_K8S_LABELS"); logOtelK8SLabels != "" { config.LogOtelK8SLabels = logOtelK8SLabels } + if logLinePrefix := os.Getenv("LOG_LINE_PREFIX"); logLinePrefix != "" { + config.LogLinePrefix = logLinePrefix + } if alwaysCollectSystemData := os.Getenv("PGA_ALWAYS_COLLECT_SYSTEM_DATA"); alwaysCollectSystemData != "" { config.AlwaysCollectSystemData = parseConfigBool(alwaysCollectSystemData) } @@ -709,6 +719,8 @@ func preprocessConfig(config *ServerConfig) (*ServerConfig, error) { config.DisableCitusSchemaStats = parseConfigDisableCitusSchemaStats(config.DisableCitusSchemaStats) } + config.LogLinePrefix = parseConfigLogLinePrefix(config.LogLinePrefix) + return config, nil } diff --git a/input/full.go b/input/full.go index ff6f24a2a..d1933d172 100644 --- a/input/full.go +++ b/input/full.go @@ -12,6 +12,7 @@ import ( "github.com/lib/pq" "github.com/pganalyze/collector/input/postgres" "github.com/pganalyze/collector/input/system" + "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/scheduler" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" @@ -155,7 +156,7 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB, ps.System = system.GetSystemState(ctx, server, logger, globalCollectionOpts) } - server.SetLogTimezone(ts.Settings) + logs.SyncLogParser(server, ts.Settings) ps.CollectorStats = getCollectorStats() ts.CollectorConfig = getCollectorConfig(server.Config) diff --git a/input/system/azure/logs.go b/input/system/azure/logs.go index 00345fa8f..ebca1c9f5 100644 --- a/input/system/azure/logs.go +++ b/input/system/azure/logs.go @@ -11,7 +11,6 @@ import ( "time" "github.com/pganalyze/collector/config" - "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" @@ -229,17 +228,21 @@ func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectio return nil } -// Parses one Azure Event Hub log record into one or two log lines (main + DETAIL) -func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string, error) { - var azureDbServerName string - - logLineContent := in.Properties.Message - +func GetServerNameFromRecord(in AzurePostgresLogRecord) string { if in.LogicalServerName == "" { // Flexible Server // For Flexible Server, logical server name is not set, so instead determine it based on the resource ID resourceParts := strings.Split(in.ResourceID, "/") - azureDbServerName = strings.ToLower(resourceParts[len(resourceParts)-1]) + return strings.ToLower(resourceParts[len(resourceParts)-1]) } else { // Single Server + return in.LogicalServerName + } +} + +// Parses one Azure Event Hub log record into one or two log lines (main + DETAIL) +func ParseRecordToLogLines(in AzurePostgresLogRecord, parser state.LogParser) ([]state.LogLine, error) { + logLineContent := in.Properties.Message + + if in.LogicalServerName != "" { // Single Server // Adjust Azure-modified log messages to be standard Postgres log messages if strings.HasPrefix(logLineContent, "connection received:") { logLineContent = connectionReceivedRegexp.ReplaceAllString(logLineContent, "$1") @@ -253,13 +256,10 @@ func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string, // Add prefix and error level, which are separated from the content on // Single Server (but our parser expects them together) logLineContent = fmt.Sprintf("%s%s: %s", in.Properties.Prefix, in.Properties.ErrorLevel, logLineContent) - - azureDbServerName = in.LogicalServerName } - - logLine, ok := logs.ParseLogLineWithPrefix("", logLineContent, nil) + logLine, ok := parser.ParseLine(logLineContent) if !ok { - return []state.LogLine{}, "", fmt.Errorf("Can't parse log line: \"%s\"", logLineContent) + return []state.LogLine{}, fmt.Errorf("Can't parse log line: \"%s\"", logLineContent) } logLines := []state.LogLine{logLine} @@ -275,7 +275,7 @@ func ParseRecordToLogLines(in AzurePostgresLogRecord) ([]state.LogLine, string, logLines = append(logLines, detailLogLine) } - return logLines, azureDbServerName, nil + return logLines, nil } func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*state.Server, in <-chan AzurePostgresLogRecord, out chan state.ParsedLogStreamItem, globalCollectionOpts state.CollectionOpts, logger *util.Logger) { @@ -296,7 +296,21 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta return } - logLines, azureDbServerName, err := ParseRecordToLogLines(in) + azureDbServerName := GetServerNameFromRecord(in) + var server *state.Server + for _, s := range servers { + if azureDbServerName == s.Config.AzureDbServerName { + server = s + } + } + if server == nil { + if globalCollectionOpts.TestRun { + logger.PrintVerbose("Discarding log line because of unknown server (did you set the correct azure_db_server_name?): %s", azureDbServerName) + } + continue + } + parser := server.GetLogParser() + logLines, err := ParseRecordToLogLines(in, parser) if err != nil { logger.PrintError("%s", err) continue @@ -310,19 +324,8 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta continue } - foundServer := false - for _, server := range servers { - if azureDbServerName == server.Config.AzureDbServerName { - foundServer = true - - for _, logLine := range logLines { - out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} - } - } - } - - if !foundServer && globalCollectionOpts.TestRun { - logger.PrintVerbose("Discarding log line because of unknown server (did you set the correct azure_db_server_name?): %s", azureDbServerName) + for _, logLine := range logLines { + out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} } } } diff --git a/input/system/azure/logs_test.go b/input/system/azure/logs_test.go index d80f89c85..1629256fb 100644 --- a/input/system/azure/logs_test.go +++ b/input/system/azure/logs_test.go @@ -7,6 +7,7 @@ import ( "github.com/kylelemons/godebug/pretty" "github.com/pganalyze/collector/input/system/azure" + "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" ) @@ -277,19 +278,24 @@ var parseRecordTests = []parseRecordTestpair{ } func TestParseRecordToLogLines(t *testing.T) { - for _, pair := range parseRecordTests { + for i, pair := range parseRecordTests { + var prefix string + if i < 3 { + prefix = logs.LogPrefixCustom3 + } else { + prefix = logs.LogPrefixAzure + } + parser := logs.NewLogParser(prefix, nil, false, false) + var record azure.AzurePostgresLogRecord err := json.Unmarshal([]byte(pair.recordIn), &record) if err != nil { t.Errorf("For \"%v\": expected unmarshaling to succeed, but it failed: %s\n", pair.recordIn, err) } - lines, serverName, err := azure.ParseRecordToLogLines(record) + lines, err := azure.ParseRecordToLogLines(record, parser) if pair.errOut != err { t.Errorf("For \"%v\": expected error to be %v, but was %v\n", pair.recordIn, pair.errOut, err) } - if pair.serverNameOut != serverName { - t.Errorf("For \"%v\": expected server name to be %v, but was %v\n", pair.recordIn, pair.serverNameOut, serverName) - } cfg := pretty.CompareConfig cfg.SkipZeroFields = true @@ -298,3 +304,17 @@ func TestParseRecordToLogLines(t *testing.T) { } } } + +func TestGetServerNameFromRecord(t *testing.T) { + for _, pair := range parseRecordTests { + var record azure.AzurePostgresLogRecord + err := json.Unmarshal([]byte(pair.recordIn), &record) + if err != nil { + t.Errorf("For \"%v\": expected unmarshaling to succeed, but it failed: %s\n", pair.recordIn, err) + } + serverName := azure.GetServerNameFromRecord(record) + if pair.serverNameOut != serverName { + t.Errorf("For \"%v\": expected server name to be %v, but was %v\n", pair.recordIn, pair.serverNameOut, serverName) + } + } +} diff --git a/input/system/google_cloudsql/logs.go b/input/system/google_cloudsql/logs.go index 472c8b279..b56e3ede5 100644 --- a/input/system/google_cloudsql/logs.go +++ b/input/system/google_cloudsql/logs.go @@ -13,7 +13,6 @@ import ( "google.golang.org/api/option" "github.com/pganalyze/collector/config" - "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" ) @@ -187,12 +186,31 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta return } + var server *state.Server + var isAlloyDBCluster bool + + for _, s := range servers { + if in.GcpProjectID == s.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == s.Config.GcpCloudSQLInstanceID { + server = s + } + if in.GcpProjectID == s.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == s.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == s.Config.GcpAlloyDBInstanceID { + server = s + isAlloyDBCluster = true + } + } + + if server == nil { + continue + } + + parser := server.GetLogParser() + // We ignore failures here since we want the per-backend stitching logic // that runs later on (and any other parsing errors will just be ignored). // Note that we need to restore the original trailing newlines since // AnalyzeStreamInGroups expects them and they are not present in the GCP // log stream. - logLine, _ := logs.ParseLogLineWithPrefix("", in.Content+"\n", nil) + logLine, _ := parser.ParseLine(in.Content + "\n") logLine.OccurredAt = in.OccurredAt // Ignore loglines which are outside our time window @@ -200,19 +218,14 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta continue } - for _, server := range servers { - if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID { - out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} - } - if in.GcpProjectID == server.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == server.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == server.Config.GcpAlloyDBInstanceID { - // AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix) - parts := regexp.MustCompile(`(?s)^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content)) - if len(parts) == 2 { - logLine.Content = parts[1] - } - out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} + if isAlloyDBCluster { + // AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix) + parts := regexp.MustCompile(`(?s)^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content)) + if len(parts) == 2 { + logLine.Content = parts[1] } } + out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} } } }() diff --git a/input/system/heroku/logs.go b/input/system/heroku/logs.go index dcf522fde..c0d01e52c 100644 --- a/input/system/heroku/logs.go +++ b/input/system/heroku/logs.go @@ -119,6 +119,8 @@ func processSystemMetrics(ctx context.Context, timestamp time.Time, content []by } } +var HerokuLogParser = logs.NewLogParser(logs.LogPrefixHeroku2, nil, false, true) + func logStreamItemToLogLine(ctx context.Context, item HttpSyslogMessage, servers []*state.Server, sourceToServer map[string]*state.Server, now time.Time, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (map[string]*state.Server, *state.LogLine, string) { timestamp, err := time.Parse(time.RFC3339, item.HeaderTimestamp) if err != nil { @@ -156,7 +158,7 @@ func logStreamItemToLogLine(ctx context.Context, item HttpSyslogMessage, servers logLineNumberChunk, _ := strconv.ParseInt(lineParts[3], 10, 32) prefixedContent := lineParts[4] - logLine, _ := logs.ParseLogLineWithPrefix("", prefixedContent+"\n", nil) + logLine, _ := HerokuLogParser.ParseLine(prefixedContent + "\n") sourceToServer = catchIdentifyServerLine(sourceName, logLine.Content, sourceToServer, servers) diff --git a/input/system/selfhosted/logs.go b/input/system/selfhosted/logs.go index b760239b0..023af2696 100644 --- a/input/system/selfhosted/logs.go +++ b/input/system/selfhosted/logs.go @@ -20,7 +20,6 @@ import ( "github.com/fsnotify/fsnotify" "github.com/papertrail/go-tail/follower" "github.com/pganalyze/collector/input/postgres" - "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" ) @@ -387,7 +386,6 @@ func setupDockerTail(ctx context.Context, containerName string, out chan<- SelfH func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger, parsedLogStream chan state.ParsedLogStreamItem) chan<- SelfHostedLogStreamItem { logStream := make(chan SelfHostedLogStreamItem) - tz := server.GetLogTimezone() wg.Add(1) go func() { @@ -406,13 +404,14 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, server *state. if !ok { return } + logParser := server.GetLogParser() // We ignore failures here since we want the per-backend stitching logic // that runs later on (and any other parsing errors will just be ignored) // Note that we need to restore the original trailing newlines since // AnalyzeStreamInGroups expects them and they are not present in the tail // log stream. - logLine, _ := logs.ParseLogLineWithPrefix("", item.Line+"\n", tz) + logLine, _ := logParser.ParseLine(item.Line + "\n") if logLine.OccurredAt.IsZero() && !item.OccurredAt.IsZero() { logLine.OccurredAt = item.OccurredAt diff --git a/input/system/selfhosted/otel_handler.go b/input/system/selfhosted/otel_handler.go index 33cae0dcb..bd2919428 100644 --- a/input/system/selfhosted/otel_handler.go +++ b/input/system/selfhosted/otel_handler.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" @@ -25,7 +24,7 @@ import ( // Other variants (e.g. csvlog, or plain messages in a K8s context) are currently // not supported and will be ignored. -func logLineFromJsonlog(record *common.KeyValueList, tz *time.Location) (state.LogLine, *state.LogLine) { +func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) (state.LogLine, *state.LogLine) { var logLine state.LogLine // If a DETAIL line is set, we need to create an additional log line @@ -33,7 +32,7 @@ func logLineFromJsonlog(record *common.KeyValueList, tz *time.Location) (state.L for _, rv := range record.Values { if rv.Key == "log_time" { - logLine.OccurredAt = logs.GetOccurredAt(rv.Value.GetStringValue(), tz, false) + logLine.OccurredAt = logParser.GetOccurredAt(rv.Value.GetStringValue()) } if rv.Key == "user_name" { logLine.Username = rv.Value.GetStringValue() @@ -108,7 +107,7 @@ func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, p func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger) error { otelLogServer := server.Config.LogOtelServer - tz := server.GetLogTimezone() + logParser := server.GetLogParser() go func() { http.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { b, err := io.ReadAll(r.Body) @@ -144,7 +143,7 @@ func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream ch // TODO: Support other logger names (this is only tested with CNPG) if logger == "postgres" { // jsonlog wrapped in K8s context (via fluentbit) - logLine, detailLine := logLineFromJsonlog(record, tz) + logLine, detailLine := logLineFromJsonlog(record, logParser) if skipDueToK8sFilter(kubernetes, server, prefixedLogger) { continue } @@ -155,7 +154,7 @@ func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream ch } } else if logger == "" && hasErrorSeverity { // simple jsonlog (Postgres jsonlog has error_severity key) - logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), tz) + logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), logParser) parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} if detailLine != nil { parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} diff --git a/input/system/tembo/logs.go b/input/system/tembo/logs.go index 9a7d243f4..93fb42994 100644 --- a/input/system/tembo/logs.go +++ b/input/system/tembo/logs.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/pganalyze/collector/logs" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" @@ -96,7 +95,7 @@ func SetupWebsocketHandlerLogs(ctx context.Context, wg *sync.WaitGroup, logger * func setupWebsocketForServer(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, server *state.Server, parsedLogStream chan state.ParsedLogStreamItem) { // Only ingest log lines that were written in the last minute before startup linesNewerThan := time.Now().Add(-1 * time.Minute) - tz := server.GetLogTimezone() + logParser := server.GetLogParser() wg.Add(1) go func() { @@ -149,7 +148,7 @@ func setupWebsocketForServer(ctx context.Context, wg *sync.WaitGroup, globalColl } for _, stream := range result.Streams { for _, values := range stream.Values { - logLine, detailLine := logLineFromJsonlog(values[1], tz, logger) + logLine, detailLine := logLineFromJsonlog(values[1], logParser, logger) // Ignore loglines which are outside our time window if !logLine.OccurredAt.IsZero() && logLine.OccurredAt.Before(linesNewerThan) { continue @@ -164,7 +163,7 @@ func setupWebsocketForServer(ctx context.Context, wg *sync.WaitGroup, globalColl }() } -func logLineFromJsonlog(recordIn string, tz *time.Location, logger *util.Logger) (state.LogLine, *state.LogLine) { +func logLineFromJsonlog(recordIn string, logParser state.LogParser, logger *util.Logger) (state.LogLine, *state.LogLine) { var event JSONLogEvent var logLine state.LogLine @@ -179,7 +178,7 @@ func logLineFromJsonlog(recordIn string, tz *time.Location, logger *util.Logger) for key, value := range event.Record { if key == "log_time" { - logLine.OccurredAt = logs.GetOccurredAt(value, tz, false) + logLine.OccurredAt = logParser.GetOccurredAt(value) } if key == "user_name" { logLine.Username = value diff --git a/logs/legacy_parse.go b/logs/legacy_parse.go new file mode 100644 index 000000000..28932ae30 --- /dev/null +++ b/logs/legacy_parse.go @@ -0,0 +1,634 @@ +package logs + +import ( + "fmt" + "io" + "regexp" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/pganalyze/collector/output/pganalyze_collector" + "github.com/pganalyze/collector/state" +) + +const LogPrefixAmazonRds string = "%t:%r:%u@%d:[%p]:" +const LogPrefixAzure string = "%t-%c-" +const LogPrefixCustom1 string = "%m [%p][%v] : [%l-1] %q[app=%a] " +const LogPrefixCustom2 string = "%t [%p-%l] %q%u@%d " +const LogPrefixCustom3 string = "%m [%p] %q[user=%u,db=%d,app=%a] " +const LogPrefixCustom4 string = "%m [%p] %q[user=%u,db=%d,app=%a,host=%h] " +const LogPrefixCustom5 string = "%t [%p]: [%l-1] user=%u,db=%d - PG-%e " +const LogPrefixCustom6 string = "%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h " +const LogPrefixCustom7 string = "%t [%p]: [%l-1] [trx_id=%x] user=%u,db=%d " +const LogPrefixCustom8 string = "[%p]: [%l-1] db=%d,user=%u " +const LogPrefixCustom9 string = "%m %r %u %a [%c] [%p] " +const LogPrefixCustom10 string = "%m [%p]: [%l-1] db=%d,user=%u " +const LogPrefixCustom11 string = "pid=%p,user=%u,db=%d,app=%a,client=%h " +const LogPrefixCustom12 string = "user=%u,db=%d,app=%a,client=%h " +const LogPrefixCustom13 string = "%p-%s-%c-%l-%h-%u-%d-%m " +const LogPrefixCustom14 string = "%m [%p][%b][%v][%x] %q[user=%u,db=%d,app=%a] " +const LogPrefixCustom15 string = "%m [%p] %q%u@%d " +const LogPrefixCustom16 string = "%t [%p] %q%u@%d %h " +const LogPrefixSimple string = "%m [%p] " +const LogPrefixHeroku1 string = " sql_error_code = %e " +const LogPrefixHeroku2 string = ` sql_error_code = %e time_ms = "%m" pid="%p" proc_start_time="%s" session_id="%c" vtid="%v" tid="%x" log_line="%l" %qdatabase="%d" connection_source="%r" user="%u" application_name="%a" ` + +// Used only to recognize the Heroku hobby tier log_line_prefix to give a warning (logs are not supported +// on hobby tier) and avoid errors during prefix check; logs with this prefix are never actually received +const LogPrefixHerokuHobbyTier string = " database = %d connection_source = %r sql_error_code = %e " +const LogPrefixEmpty string = "" + +var RecommendedPrefixIdx = 4 + +var SupportedPrefixes = []string{ + LogPrefixAmazonRds, LogPrefixAzure, LogPrefixCustom1, LogPrefixCustom2, + LogPrefixCustom3, LogPrefixCustom4, LogPrefixCustom5, LogPrefixCustom6, + LogPrefixCustom7, LogPrefixCustom8, LogPrefixCustom9, LogPrefixCustom10, + LogPrefixCustom11, LogPrefixCustom12, LogPrefixCustom13, LogPrefixCustom14, + LogPrefixCustom15, LogPrefixCustom16, + LogPrefixSimple, LogPrefixHeroku1, LogPrefixHeroku2, LogPrefixEmpty, +} + +// Every one of these regexps should produce exactly one matching group +var TimeRegexp = `(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)? [\-+]?\w+)` // %t or %m (or %s) +var HostAndPortRegexp = `(.+(?:\(\d+\))?)?` // %r +var PidRegexp = `(\d+)` // %p +var UserRegexp = `(\S*)` // %u +var DbRegexp = `(\S*)` // %d +var AppBeforeSpaceRegexp = `(\S*)` // %a +var AppBeforeCommaRegexp = `([^,]*)` // %a +var AppBeforeQuoteRegexp = `([^"]*)` // %a +var AppInsideBracketsRegexp = `(\[unknown\]|[^,\]]*)` // %a +var HostRegexp = `(\S*)` // %h +var VirtualTxRegexp = `(\d+/\d+)?` // %v +var LogLineCounterRegexp = `(\d+)` // %l +var SqlstateRegexp = `(\w{5})` // %e +var TransactionIdRegexp = `(\d+)` // %x +var SessionIdRegexp = `(\w+\.\w+)` // %c +var BackendTypeRegexp = `([\w ]+)` // %b +// Missing: +// - %n (unix timestamp) +// - %i (command tag) + +var LevelAndContentRegexp = `(\w+):\s+(.*\n?)$` +var LogPrefixAmazonRdsRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + `:` + HostAndPortRegexp + `:` + UserRegexp + `@` + DbRegexp + `:\[` + PidRegexp + `\]:` + LevelAndContentRegexp) +var LogPrefixAzureRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + `-` + SessionIdRegexp + `-` + LevelAndContentRegexp) +var LogPrefixCustom1Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]\[` + VirtualTxRegexp + `\] : \[` + LogLineCounterRegexp + `-1\] (?:\[app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) +var LogPrefixCustom2Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `-` + LogLineCounterRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` )?` + LevelAndContentRegexp) +var LogPrefixCustom3Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) +var LogPrefixCustom4Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,host=` + HostRegexp + `\] )?` + LevelAndContentRegexp) +var LogPrefixCustom5Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] user=` + UserRegexp + `,db=` + DbRegexp + ` - PG-` + SqlstateRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom6Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom7Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] \[trx_id=` + TransactionIdRegexp + `\] user=` + UserRegexp + `,db=` + DbRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom8Regexp = regexp.MustCompile(`(?s)^\[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] db=` + DbRegexp + `,user=` + UserRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom9Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` ` + HostAndPortRegexp + ` ` + UserRegexp + ` ` + AppBeforeSpaceRegexp + ` \[` + SessionIdRegexp + `\] \[` + PidRegexp + `\] ` + LevelAndContentRegexp) +var LogPrefixCustom10Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] db=` + DbRegexp + `,user=` + UserRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom11Regexp = regexp.MustCompile(`(?s)^pid=` + PidRegexp + `,user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom12Regexp = regexp.MustCompile(`(?s)^user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom13Regexp = regexp.MustCompile(`(?s)^` + PidRegexp + `-` + TimeRegexp + `-` + SessionIdRegexp + `-` + LogLineCounterRegexp + `-` + HostRegexp + `-` + UserRegexp + `-` + DbRegexp + `-` + TimeRegexp + ` ` + LevelAndContentRegexp) +var LogPrefixCustom14Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]\[` + BackendTypeRegexp + `\]\[` + VirtualTxRegexp + `\]\[` + TransactionIdRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) +var LogPrefixCustom15Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` )?` + LevelAndContentRegexp) +var LogPrefixCustom16Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` ` + HostRegexp + ` )?` + LevelAndContentRegexp) +var LogPrefixSimpleRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + LevelAndContentRegexp) +var LogPrefixNoTimestampUserDatabaseAppRegexp = regexp.MustCompile(`(?s)^\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] ` + LevelAndContentRegexp) +var LogPrefixHeroku1Regexp = regexp.MustCompile(`^ sql_error_code = ` + SqlstateRegexp + " " + LevelAndContentRegexp) +var LogPrefixHeroku2Regexp = regexp.MustCompile(`^ sql_error_code = ` + SqlstateRegexp + ` time_ms = "` + TimeRegexp + `" pid="` + PidRegexp + `" proc_start_time="` + TimeRegexp + `" session_id="` + SessionIdRegexp + `" vtid="` + VirtualTxRegexp + `" tid="` + TransactionIdRegexp + `" log_line="` + LogLineCounterRegexp + `" (?:database="` + DbRegexp + `" connection_source="` + HostAndPortRegexp + `" user="` + UserRegexp + `" application_name="` + AppBeforeQuoteRegexp + `" )?` + LevelAndContentRegexp) + +var SyslogSequenceAndSplitRegexp = `(\[[\d-]+\])?` + +var RsyslogLevelAndContentRegexp = `(?:(\w+):\s+)?(.*\n?)$` +var RsyslogTimeRegexp = `(\w+\s+\d+ \d{2}:\d{2}:\d{2})` +var RsyslogHostnameRegxp = `(\S+)` +var RsyslogProcessNameRegexp = `(\w+)` +var RsyslogRegexp = regexp.MustCompile(`^` + RsyslogTimeRegexp + ` ` + RsyslogHostnameRegxp + ` ` + RsyslogProcessNameRegexp + `\[` + PidRegexp + `\]: ` + SyslogSequenceAndSplitRegexp + ` ` + RsyslogLevelAndContentRegexp) + +func IsSupportedPrefix(prefix string) bool { + for _, supportedPrefix := range SupportedPrefixes { + if supportedPrefix == prefix { + return true + } + } + return false +} + +func ParseLogLineWithPrefix(prefix string, line string, tz *time.Location) (logLine state.LogLine, ok bool) { + var timePart, userPart, dbPart, appPart, pidPart, logLineNumberPart, levelPart, contentPart string + + rsyslog := false + + // Only read the first 1000 characters of a log line to parse the log_line_prefix + // + // This reduces the overhead for very long loglines, because we don't pass in the + // whole line to the regexp engine (twice). + lineExtra := "" + if len(line) > 1000 { + lineExtra = line[1000:] + line = line[0:1000] + } + + if prefix == "" { + if LogPrefixAmazonRdsRegexp.MatchString(line) { + prefix = LogPrefixAmazonRds + } else if LogPrefixAzureRegexp.MatchString(line) { + prefix = LogPrefixAzure + } else if LogPrefixCustom1Regexp.MatchString(line) { + prefix = LogPrefixCustom1 + } else if LogPrefixCustom2Regexp.MatchString(line) { + prefix = LogPrefixCustom2 + } else if LogPrefixCustom4Regexp.MatchString(line) { // 4 is more specific than 3, so needs to go first + prefix = LogPrefixCustom4 + } else if LogPrefixCustom3Regexp.MatchString(line) { + prefix = LogPrefixCustom3 + } else if LogPrefixCustom5Regexp.MatchString(line) { + prefix = LogPrefixCustom5 + } else if LogPrefixCustom6Regexp.MatchString(line) { + prefix = LogPrefixCustom6 + } else if LogPrefixCustom7Regexp.MatchString(line) { + prefix = LogPrefixCustom7 + } else if LogPrefixCustom8Regexp.MatchString(line) { + prefix = LogPrefixCustom8 + } else if LogPrefixCustom9Regexp.MatchString(line) { + prefix = LogPrefixCustom9 + } else if LogPrefixCustom10Regexp.MatchString(line) { + prefix = LogPrefixCustom10 + } else if LogPrefixCustom11Regexp.MatchString(line) { + prefix = LogPrefixCustom11 + } else if LogPrefixCustom12Regexp.MatchString(line) { + prefix = LogPrefixCustom12 + } else if LogPrefixCustom13Regexp.MatchString(line) { + prefix = LogPrefixCustom13 + } else if LogPrefixCustom14Regexp.MatchString(line) { + prefix = LogPrefixCustom14 + } else if LogPrefixCustom15Regexp.MatchString(line) { + prefix = LogPrefixCustom15 + } else if LogPrefixCustom16Regexp.MatchString(line) { + prefix = LogPrefixCustom16 + } else if LogPrefixSimpleRegexp.MatchString(line) { + prefix = LogPrefixSimple + } else if LogPrefixHeroku2Regexp.MatchString(line) { + prefix = LogPrefixHeroku2 + } else if LogPrefixHeroku1Regexp.MatchString(line) { + // LogPrefixHeroku1 is a subset of 2, so it must be matched second + prefix = LogPrefixHeroku1 + } else if RsyslogRegexp.MatchString(line) { + rsyslog = true + } + } + + if rsyslog { + parts := RsyslogRegexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + + timePart = fmt.Sprintf("%d %s", time.Now().Year(), parts[1]) + // ignore syslog hostname + // ignore syslog process name + pidPart = parts[4] + // ignore syslog postgres sequence and split number + levelPart = parts[6] + contentPart = strings.Replace(parts[7], "#011", "\t", -1) + + parts = LogPrefixNoTimestampUserDatabaseAppRegexp.FindStringSubmatch(contentPart) + if len(parts) == 6 { + userPart = parts[1] + dbPart = parts[2] + appPart = parts[3] + levelPart = parts[4] + contentPart = parts[5] + } + } else { + switch prefix { + case LogPrefixAmazonRds: // "%t:%r:%u@%d:[%p]:" + parts := LogPrefixAmazonRdsRegexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + + timePart = parts[1] + // skip %r (ip+port) + userPart = parts[3] + dbPart = parts[4] + pidPart = parts[5] + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixAzure: // "%t-%c-" + parts := LogPrefixAzureRegexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + + timePart = parts[1] + // skip %c (session id) + levelPart = parts[3] + contentPart = parts[4] + case LogPrefixCustom1: // "%m [%p][%v] : [%l-1] %q[app=%a] " + parts := LogPrefixCustom1Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + // skip %v (virtual TX) + logLineNumberPart = parts[4] + appPart = parts[5] + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixCustom2: // "%t [%p-1] %q%u@%d " + parts := LogPrefixCustom2Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + logLineNumberPart = parts[3] + userPart = parts[4] + dbPart = parts[5] + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixCustom3: // "%m [%p] %q[user=%u,db=%d,app=%a] "" + parts := LogPrefixCustom3Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + userPart = parts[3] + dbPart = parts[4] + appPart = parts[5] + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixCustom4: // "%m [%p] %q[user=%u,db=%d,app=%a,host=%h] " + parts := LogPrefixCustom4Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + userPart = parts[3] + dbPart = parts[4] + appPart = parts[5] + // skip %h (host) + levelPart = parts[7] + contentPart = parts[8] + case LogPrefixCustom5: // "%t [%p]: [%l-1] user=%u,db=%d - PG-%e " + parts := LogPrefixCustom5Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + logLineNumberPart = parts[3] + userPart = parts[4] + dbPart = parts[5] + // skip %e (SQLSTATE) + levelPart = parts[7] + contentPart = parts[8] + case LogPrefixCustom6: // "%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h " + parts := LogPrefixCustom6Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + logLineNumberPart = parts[3] + userPart = parts[4] + dbPart = parts[5] + // skip %a (application name) + // skip %h (host) + levelPart = parts[8] + contentPart = parts[9] + case LogPrefixCustom7: // "%t [%p]: [%l-1] [trx_id=%x] user=%u,db=%d " + parts := LogPrefixCustom7Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + logLineNumberPart = parts[3] + // skip %x (transaction id) + userPart = parts[5] + dbPart = parts[6] + levelPart = parts[7] + contentPart = parts[8] + case LogPrefixCustom8: // "[%p]: [%l-1] db=%d,user=%u " + parts := LogPrefixCustom8Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + pidPart = parts[1] + logLineNumberPart = parts[2] + dbPart = parts[3] + userPart = parts[4] + levelPart = parts[5] + contentPart = parts[6] + case LogPrefixCustom9: // "%m %r %u %a [%c] [%p] " + parts := LogPrefixCustom9Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + // skip %r (ip+port) + userPart = parts[3] + appPart = parts[4] + // skip %c (session id) + pidPart = parts[6] + levelPart = parts[7] + contentPart = parts[8] + case LogPrefixCustom10: // "%t [%p]: [%l-1] db=%d,user=%u " + parts := LogPrefixCustom10Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + logLineNumberPart = parts[3] + dbPart = parts[4] + userPart = parts[5] + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixCustom11: // "pid=%p,user=%u,db=%d,app=%a,client=%h " + parts := LogPrefixCustom11Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + pidPart = parts[1] + userPart = parts[2] + dbPart = parts[3] + // skip %a (application name) + // skip %h (host) + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixCustom12: // "user=%u,db=%d,app=%a,client=%h " + parts := LogPrefixCustom12Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + userPart = parts[1] + dbPart = parts[2] + // skip %a (application name) + // skip %h (host) + levelPart = parts[5] + contentPart = parts[6] + case LogPrefixCustom13: // "%p-%s-%c-%l-%h-%u-%d-%m " + parts := LogPrefixCustom13Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + pidPart = parts[1] + // skip %s + // skip %c + logLineNumberPart = parts[4] + // skip %h (host) + userPart = parts[6] + dbPart = parts[7] + timePart = parts[8] + levelPart = parts[9] + contentPart = parts[10] + case LogPrefixCustom14: // "%m [%p][%b][%v][%x] %q[user=%u,db=%d,app=%a] " + parts := LogPrefixCustom14Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + + timePart = parts[1] + pidPart = parts[2] + // skip %b + // skip %v + // skip %x + userPart = parts[6] + dbPart = parts[7] + appPart = parts[8] + levelPart = parts[9] + contentPart = parts[10] + case LogPrefixCustom15: // "%m [%p] %q%u@%d " + parts := LogPrefixCustom15Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + userPart = parts[3] + dbPart = parts[4] + levelPart = parts[5] + contentPart = parts[6] + case LogPrefixCustom16: // "%t [%p] %q%u@%d %h " + parts := LogPrefixCustom16Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + userPart = parts[3] + dbPart = parts[4] + // skip %h (host) + levelPart = parts[6] + contentPart = parts[7] + case LogPrefixSimple: // "%t [%p] " + parts := LogPrefixSimpleRegexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + timePart = parts[1] + pidPart = parts[2] + levelPart = parts[3] + contentPart = parts[4] + case LogPrefixHeroku1: + parts := LogPrefixHeroku1Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + // skip %e + levelPart = parts[2] + contentPart = parts[3] + case LogPrefixHeroku2: + parts := LogPrefixHeroku2Regexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + // skip %e + timePart = parts[2] + pidPart = parts[3] + // skip %s + // skip %c + // skip %v + // skip %x + logLineNumberPart = parts[8] + dbPart = parts[9] + // skip %r + userPart = parts[11] + appPart = parts[12] + levelPart = parts[13] + contentPart = parts[14] + default: + // Some callers use the content of unparsed lines to stitch multi-line logs together + logLine.Content = line + lineExtra + return + } + } + + if timePart != "" { + occurredAt := GetOccurredAt(timePart, tz, rsyslog) + if occurredAt.IsZero() { + return + } + logLine.OccurredAt = occurredAt + } + + if userPart != "[unknown]" { + logLine.Username = userPart + } + if dbPart != "[unknown]" { + logLine.Database = dbPart + } + if appPart != "[unknown]" { + logLine.Application = appPart + } + if logLineNumberPart != "" { + logLineNumber, _ := strconv.ParseInt(logLineNumberPart, 10, 32) + logLine.LogLineNumber = int32(logLineNumber) + } + + backendPid, _ := strconv.ParseInt(pidPart, 10, 32) + logLine.BackendPid = int32(backendPid) + logLine.Content = contentPart + lineExtra + + // This is actually a continuation of a previous line + if levelPart == "" { + return + } + + logLine.LogLevel = pganalyze_collector.LogLineInformation_LogLevel(pganalyze_collector.LogLineInformation_LogLevel_value[levelPart]) + ok = true + + return +} + +func GetOccurredAt(timePart string, tz *time.Location, rsyslog bool) time.Time { + if tz != nil && !rsyslog { + lastSpaceIdx := strings.LastIndex(timePart, " ") + if lastSpaceIdx == -1 { + return time.Time{} + } + timePartNoTz := timePart[0:lastSpaceIdx] + result, err := time.ParseInLocation("2006-01-02 15:04:05", timePartNoTz, tz) + if err != nil { + return time.Time{} + } + + return result + } + + // Assume Postgres time format unless overriden by the prefix (e.g. syslog) + var timeFormat, timeFormatAlt string + if rsyslog { + timeFormat = "2006 Jan 2 15:04:05" + timeFormatAlt = "" + } else { + timeFormat = "2006-01-02 15:04:05 -0700" + timeFormatAlt = "2006-01-02 15:04:05 MST" + } + + ts, err := time.Parse(timeFormat, timePart) + if err != nil { + if timeFormatAlt != "" { + // Ensure we have the correct format remembered for ParseInLocation call that may happen later + timeFormat = timeFormatAlt + ts, err = time.Parse(timeFormat, timePart) + } + if err != nil { + return time.Time{} + } + } + + // Handle non-UTC timezones in systems that have log_timezone set to a different + // timezone value than their system timezone. This is necessary because Go otherwise + // only reads the timezone name but does not set the timezone offset, see + // https://pkg.go.dev/time#Parse + zone, offset := ts.Zone() + if offset == 0 && zone != "UTC" && zone != "" { + var zoneLocation *time.Location + zoneNum, err := strconv.Atoi(zone) + if err == nil { + zoneLocation = time.FixedZone(zone, zoneNum*3600) + } else { + zoneLocation, err = time.LoadLocation(zone) + if err != nil { + // We don't know which timezone this is (and a timezone name is present), so we can't process this log line + return time.Time{} + } + } + ts, err = time.ParseInLocation(timeFormat, timePart, zoneLocation) + if err != nil { + // Technically this should not occur (as we should have already failed previously in time.Parse) + return time.Time{} + } + } + return ts +} + +type LineReader interface { + ReadString(delim byte) (string, error) +} + +func ParseAndAnalyzeBuffer(logStream LineReader, linesNewerThan time.Time, server *state.Server) ([]state.LogLine, []state.PostgresQuerySample) { + var logLines []state.LogLine + var currentByteStart int64 = 0 + parser := server.GetLogParser() + + for { + line, err := logStream.ReadString('\n') + byteStart := currentByteStart + currentByteStart += int64(len(line)) + + // This is intentionally after updating currentByteStart, since we consume the + // data in the file even if an error is returned + if err != nil { + if err != io.EOF { + fmt.Printf("Log Read ERROR: %s", err) + } + break + } + + logLine, ok := parser.ParseLine(line) + if !ok { + // Assume that a parsing error in a follow-on line means that we actually + // got additional data for the previous line + if len(logLines) > 0 && logLine.Content != "" { + logLines[len(logLines)-1].Content += logLine.Content + logLines[len(logLines)-1].ByteEnd += int64(len(logLine.Content)) + } + continue + } + + // Ignore loglines which are outside our time window + if logLine.OccurredAt.Before(linesNewerThan) { + continue + } + + // Ignore loglines that are ignored server-wide (e.g. because they are + // log_statement=all/log_duration=on lines). Note this intentionally + // runs after multi-line log lines have been stitched together. + if server.IgnoreLogLine(logLine.Content) { + continue + } + + logLine.ByteStart = byteStart + logLine.ByteContentStart = byteStart + int64(len(line)-len(logLine.Content)) + logLine.ByteEnd = byteStart + int64(len(line)) + + // Generate unique ID that can be used to reference this line + logLine.UUID, err = uuid.NewV7() + if err != nil { + fmt.Printf("Failed to generate log line UUID: %s", err) + continue + } + + logLines = append(logLines, logLine) + } + + newLogLines, newSamples := AnalyzeLogLines(logLines) + return newLogLines, newSamples +} diff --git a/logs/legacy_parse_test.go b/logs/legacy_parse_test.go new file mode 100644 index 000000000..6e5d66cb0 --- /dev/null +++ b/logs/legacy_parse_test.go @@ -0,0 +1,619 @@ +package logs_test + +import ( + "testing" + "time" + + "github.com/kylelemons/godebug/pretty" + "github.com/pganalyze/collector/logs" + "github.com/pganalyze/collector/output/pganalyze_collector" + "github.com/pganalyze/collector/state" +) + +type parseTestpair struct { + prefixIn string + lineIn string + lineInTz *time.Location + lineOut state.LogLine + lineOutOk bool +} + +func mustTimeLocation(tzStr string) *time.Location { + tz, err := time.LoadLocation(tzStr) + if err != nil { + panic(err) + } + return tz +} + +var BSTTimeLocation = mustTimeLocation("Europe/London") + +var parseTests = []parseTestpair{ + // rsyslog format + { + "", + "Feb 1 21:48:31 ip-172-31-14-41 postgres[9076]: [3-1] LOG: database system is ready to accept connections", + nil, + state.LogLine{ + OccurredAt: time.Date(time.Now().Year(), time.February, 1, 21, 48, 31, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 9076, + Content: "database system is ready to accept connections", + }, + true, + }, + { + "", + "Feb 1 21:48:31 ip-172-31-14-41 postgres[9076]: [3-2] #011 something", + nil, + state.LogLine{ + OccurredAt: time.Date(time.Now().Year(), time.February, 1, 21, 48, 31, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_UNKNOWN, + BackendPid: 9076, + Content: "\t something", + }, + false, + }, + { + "", + "Feb 1 21:48:31 ip-172-31-14-41 postgres[123]: [8-1] [user=postgres,db=postgres,app=[unknown]] LOG: connection received: host=[local]", + nil, + state.LogLine{ + OccurredAt: time.Date(time.Now().Year(), time.February, 1, 21, 48, 31, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 123, + Username: "postgres", + Database: "postgres", + Content: "connection received: host=[local]", + }, + true, + }, + // Amazon RDS format + { + "", + "2018-08-22 16:00:04 UTC:ec2-1-1-1-1.compute-1.amazonaws.com(48808):myuser@mydb:[18762]:LOG: duration: 3668.685 ms execute : SELECT 1", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.August, 22, 16, 0, 4, 0, time.UTC), + Username: "myuser", + Database: "mydb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 18762, + Content: "duration: 3668.685 ms execute : SELECT 1", + }, + true, + }, + { + "", + "2018-08-22 16:00:03 UTC:127.0.0.1(36404):myuser@mydb:[21495]:LOG: duration: 1630.946 ms execute 3: SELECT 1", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.August, 22, 16, 0, 3, 0, time.UTC), + Username: "myuser", + Database: "mydb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 21495, + Content: "duration: 1630.946 ms execute 3: SELECT 1", + }, + true, + }, + { + "", + "2018-08-22 16:00:03 UTC:[local]:myuser@mydb:[21495]:LOG: duration: 1630.946 ms execute 3: SELECT 1", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.August, 22, 16, 0, 3, 0, time.UTC), + Username: "myuser", + Database: "mydb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 21495, + Content: "duration: 1630.946 ms execute 3: SELECT 1", + }, + true, + }, + // Azure format + { + "", + "2020-06-21 22:37:10 UTC-5eefe116.22f4-LOG: could not receive data from client: An existing connection was forcibly closed by the remote host.", + nil, + state.LogLine{ + OccurredAt: time.Date(2020, time.June, 21, 22, 37, 10, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "could not receive data from client: An existing connection was forcibly closed by the remote host.", + }, + true, + }, + // Custom 1 format + { + "", + "2018-09-27 06:57:01.030 EST [20194][] : [1-1] [app=pganalyze_collector] LOG: connection received: host=[local]", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, time.FixedZone("EST", -5*3600)), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + LogLineNumber: 1, + Application: "pganalyze_collector", + Content: "connection received: host=[local]", + }, + true, + }, + // Custom 3 format + { + "", + "2018-09-27 06:57:01.030 UTC [20194] [user=[unknown],db=[unknown],app=[unknown]] LOG: connection received: host=[local]", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "connection received: host=[local]", + }, + true, + }, + { + "", + "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql] ERROR: canceling statement due to user request", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 2, 779*1000*1000, time.UTC), + Username: "postgres", + Database: "postgres", + Application: "psql", + LogLevel: pganalyze_collector.LogLineInformation_ERROR, + BackendPid: 20194, + Content: "canceling statement due to user request", + }, + true, + }, + { + "", + "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql] LOG: duration: 3000.019 ms statement: SELECT pg_sleep(3\n);", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 2, 779*1000*1000, time.UTC), + Username: "postgres", + Database: "postgres", + Application: "psql", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "duration: 3000.019 ms statement: SELECT pg_sleep(3\n);", + }, + true, + }, + // Custom 4 format + { + "", + "2018-09-27 06:57:01.030 UTC [20194] [user=[unknown],db=[unknown],app=[unknown],host=[local]] LOG: connection received: host=[local]", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "connection received: host=[local]", + }, + true, + }, + { + "", + "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql,host=127.0.0.1] ERROR: canceling statement due to user request", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 2, 779*1000*1000, time.UTC), + Username: "postgres", + Database: "postgres", + Application: "psql", + LogLevel: pganalyze_collector.LogLineInformation_ERROR, + BackendPid: 20194, + Content: "canceling statement due to user request", + }, + true, + }, + // Custom 5 format + { + "", + "2018-09-28 07:37:59 UTC [331]: [1-1] user=[unknown],db=[unknown] - PG-00000 LOG: connection received: host=127.0.0.1 port=49738", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 28, 7, 37, 59, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 331, + LogLineNumber: 1, + Content: "connection received: host=127.0.0.1 port=49738", + }, + true, + }, + { + "", + "2018-09-28 07:39:48 UTC [347]: [3-1] user=postgres,db=postgres - PG-57014 ERROR: canceling statement due to user request", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 28, 7, 39, 48, 0, time.UTC), + Username: "postgres", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_ERROR, + BackendPid: 347, + LogLineNumber: 3, + Content: "canceling statement due to user request", + }, + true, + }, + // Custom 6 format + { + "", + "2018-10-16 01:25:58 UTC [93897]: [4-1] user=,db=,app=,client= LOG: database system is ready to accept connections", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.October, 16, 1, 25, 58, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 93897, + LogLineNumber: 4, + Content: "database system is ready to accept connections", + }, + true, + }, + { + "", + "2018-10-16 01:26:09 UTC [93907]: [1-1] user=[unknown],db=[unknown],app=[unknown],client=::1 LOG: connection received: host=::1 port=61349", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.October, 16, 1, 26, 9, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 93907, + LogLineNumber: 1, + Content: "connection received: host=::1 port=61349", + }, + true, + }, + { + "", + "2018-10-16 01:26:33 UTC [93911]: [3-1] user=postgres,db=postgres,app=psql,client=::1 ERROR: canceling statement due to user request", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.October, 16, 1, 26, 33, 0, time.UTC), + Username: "postgres", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_ERROR, + BackendPid: 93911, + LogLineNumber: 3, + Content: "canceling statement due to user request", + }, + true, + }, + // Custom 7 format + { + "", + "2019-01-01 01:59:42 UTC [1]: [4-1] [trx_id=0] user=,db= LOG: database system is ready to accept connections", + nil, + state.LogLine{ + OccurredAt: time.Date(2019, time.January, 1, 1, 59, 42, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 1, + LogLineNumber: 4, + Content: "database system is ready to accept connections", + }, + true, + }, + { + "", + "2019-01-01 02:00:28 UTC [35]: [1-1] [trx_id=0] user=[unknown],db=[unknown] LOG: connection received: host=::1 port=38842", + nil, + state.LogLine{ + OccurredAt: time.Date(2019, time.January, 1, 2, 0, 28, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 35, + LogLineNumber: 1, + Content: "connection received: host=::1 port=38842", + }, + true, + }, + { + "", + "2019-01-01 02:00:28 UTC [34]: [3-1] [trx_id=120950] user=postgres,db=postgres ERROR: canceling statement due to user request", + nil, + state.LogLine{ + OccurredAt: time.Date(2019, time.January, 1, 2, 0, 28, 0, time.UTC), + Username: "postgres", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_ERROR, + BackendPid: 34, + LogLineNumber: 3, + Content: "canceling statement due to user request", + }, + true, + }, + // Custom 8 format + { + "", + "[1127]: [8-1] db=postgres,user=pganalyze LOG: duration: 2001.842 ms statement: SELECT pg_sleep(2);", + nil, + state.LogLine{ + OccurredAt: time.Time{}, + Username: "pganalyze", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 1127, + LogLineNumber: 8, + Content: "duration: 2001.842 ms statement: SELECT pg_sleep(2);", + }, + true, + }, + // Custom 9 format + { + "", + "2020-05-21 17:53:05.307 UTC [5ec6bfff.1] [1] LOG: database system is ready to accept connections", + nil, + state.LogLine{ + OccurredAt: time.Date(2020, time.May, 21, 17, 53, 05, 307*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 1, + Content: "database system is ready to accept connections", + }, + true, + }, + { + "", + "2020-05-21 17:54:35.256 UTC 172.18.0.1(56402) pgaweb [unknown] [5ec6c05b.22] [34] LOG: connection authorized: user=pgaweb database=pgaweb application_name=psql", + nil, + state.LogLine{ + OccurredAt: time.Date(2020, time.May, 21, 17, 54, 35, 256*1000*1000, time.UTC), + Username: "pgaweb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 34, + Content: "connection authorized: user=pgaweb database=pgaweb application_name=psql", + }, + true, + }, + { + "", + "2020-05-21 17:54:43.808 UTC 172.18.0.1(56402) pgaweb psql [5ec6c05b.22] [34] LOG: disconnection: session time: 0:00:08.574 user=pgaweb database=pgaweb host=172.18.0.1 port=56402", + nil, + state.LogLine{ + OccurredAt: time.Date(2020, time.May, 21, 17, 54, 43, 808*1000*1000, time.UTC), + Username: "pgaweb", + Application: "psql", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 34, + Content: "disconnection: session time: 0:00:08.574 user=pgaweb database=pgaweb host=172.18.0.1 port=56402", + }, + true, + }, + // Custom 10 format + { + "", + "2020-09-04 16:03:11.375 UTC [417880]: [1-1] db=mydb,user=myuser LOG: pganalyze-collector-identify: myserver", + nil, + state.LogLine{ + OccurredAt: time.Date(2020, time.September, 4, 16, 3, 11, 375*1000*1000, time.UTC), + Username: "myuser", + Database: "mydb", + BackendPid: 417880, + LogLineNumber: 1, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "pganalyze-collector-identify: myserver", + }, + true, + }, + // Custom 11 format + { + "", + "pid=8284,user=[unknown],db=[unknown],app=[unknown],client=[local] LOG: connection received: host=[local]", + nil, + state.LogLine{ + BackendPid: 8284, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "connection received: host=[local]", + }, + true, + }, + { + "", + "pid=8284,user=[unknown],db=[unknown],app=why would you[] name your application this,client=[local] LOG: connection received: host=[local]", + nil, + state.LogLine{ + BackendPid: 8284, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "connection received: host=[local]", + }, + true, + }, + // Custom 12 format + { + "", + "user=[unknown],db=[unknown],app=[unknown],client=[local] LOG: connection received: host=[local]", + nil, + state.LogLine{ + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "connection received: host=[local]", + }, + true, + }, + // Custom 13 format + { + "", + "27-2021-11-17 19:06:14 UTC-619552a6.1b-1----2021-11-17 19:06:14.946 UTC LOG: database system was shut down at 2021-11-17 19:01:42 UTC", + nil, + state.LogLine{ + OccurredAt: time.Date(2021, time.November, 17, 19, 6, 14, 946*1000*1000, time.UTC), + BackendPid: 27, + LogLineNumber: 1, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "database system was shut down at 2021-11-17 19:01:42 UTC", + }, + true, + }, + { + "", + "51-2021-11-17 19:11:13 UTC-619553d1.33-2-172.20.0.1-pgaweb-pgaweb-2021-11-17 19:11:13.562 UTC LOG: connection authorized: user=pgaweb database=pgaweb application_name=puma: cluster worker 2: 18544 [pganalyze]", + nil, + state.LogLine{ + OccurredAt: time.Date(2021, time.November, 17, 19, 11, 13, 562*1000*1000, time.UTC), + Username: "pgaweb", + Database: "pgaweb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 51, + LogLineNumber: 2, + Content: "connection authorized: user=pgaweb database=pgaweb application_name=puma: cluster worker 2: 18544 [pganalyze]", + }, + true, + }, + // Custom 14 format + { + "", + "2021-11-17 19:06:53.897 UTC [34][autovacuum worker][3/5][22996] LOG: automatic analyze of table \"mydb.pg_catalog.pg_class\" system usage: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.01 s", + nil, + state.LogLine{ + OccurredAt: time.Date(2021, time.November, 17, 19, 6, 53, 897*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 34, + Content: "automatic analyze of table \"mydb.pg_catalog.pg_class\" system usage: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.01 s", + }, + true, + }, + // Custom 15 format + { + "", + "2022-07-22 06:13:13.389 UTC [1] LOG: database system is ready to accept connections", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.July, 22, 6, 13, 13, 389*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 1, + Content: "database system is ready to accept connections", + }, + true, + }, + { + "", + "2022-07-22 06:13:45.781 UTC [75] myuser@mydb LOG: connection authorized: user=myuser database=mydb application_name=psql", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.July, 22, 6, 13, 45, 781*1000*1000, time.UTC), + Username: "myuser", + Database: "mydb", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 75, + Content: "connection authorized: user=myuser database=mydb application_name=psql", + }, + true, + }, + // Custom 16 format + { + "", + "2022-07-22 06:13:12 UTC [1] LOG: starting PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.July, 22, 6, 13, 12, 0, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 1, + Content: "starting PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit", + }, + true, + }, + { + "", + "2022-07-22 06:14:23 UTC [76] my-user@my-db 1.2.3.4 LOG: disconnection: session time: 0:00:01.667 user=my-user database=my-db host=1.2.3.4 port=5678", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.July, 22, 6, 14, 23, 0, time.UTC), + Username: "my-user", + Database: "my-db", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 76, + Content: "disconnection: session time: 0:00:01.667 user=my-user database=my-db host=1.2.3.4 port=5678", + }, + true, + }, + // Simple format + { + "", + "2018-05-04 03:06:18.360 UTC [3184] LOG: pganalyze-collector-identify: server1", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.May, 4, 3, 6, 18, 360*1000*1000, time.UTC), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 3184, + Content: "pganalyze-collector-identify: server1", + }, + true, + }, + { + "", + "2018-05-04 03:06:18.360 +0100 [3184] LOG: pganalyze-collector-identify: server1", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.May, 4, 3, 6, 18, 360*1000*1000, time.FixedZone("+0100", 3600)), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 3184, + Content: "pganalyze-collector-identify: server1", + }, + true, + }, + { + "", + ` sql_error_code = 28000 FATAL: no pg_hba.conf entry for host "127.0.0.1", user "postgres", database "postgres", SSL off`, + nil, + state.LogLine{ + LogLevel: pganalyze_collector.LogLineInformation_FATAL, + Content: "no pg_hba.conf entry for host \"127.0.0.1\", user \"postgres\", database \"postgres\", SSL off", + }, + true, + }, + { + "", + ` sql_error_code = 28000 time_ms = "2022-06-02 22:48:20.807 UTC" pid="11666" proc_start_time="2022-06-02 22:48:20 UTC" session_id="62993e34.2d92" vtid="6/17007" tid="0" log_line="1" database="postgres" connection_source="127.0.0.1(36532)" user="postgres" application_name="[unknown]" FATAL: no pg_hba.conf entry for host "127.0.0.1", user "postgres", database "postgres", SSL off`, + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.June, 2, 22, 48, 20, 807*1000*1000, time.UTC), + Username: "postgres", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_FATAL, + BackendPid: 11666, + LogLineNumber: 1, + Content: "no pg_hba.conf entry for host \"127.0.0.1\", user \"postgres\", database \"postgres\", SSL off", + }, + true, + }, + { + "", + "2022-12-23 09:53:43.862 -03 [790081] LOG: pganalyze-collector-identify: server1", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.December, 23, 9, 53, 43, 862*1000*1000, time.FixedZone("-03", -3*3600)), + LogLevel: 6, + BackendPid: 790081, + Content: "pganalyze-collector-identify: server1", + }, + true, + }, + // Custom 3 format with explicit time zone + { + "", + "2018-09-27 06:57:01.030 BST [20194] [user=[unknown],db=[unknown],app=[unknown]] LOG: connection received: host=[local]", + BSTTimeLocation, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, BSTTimeLocation), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "connection received: host=[local]", + }, + true, + }, +} + +func TestParseLogLineWithPrefix(t *testing.T) { + for _, pair := range parseTests { + l, lOk := logs.ParseLogLineWithPrefix(pair.prefixIn, pair.lineIn, pair.lineInTz) + + cfg := pretty.CompareConfig + cfg.SkipZeroFields = true + + if pair.lineOutOk != lOk { + t.Errorf("For \"%v\": expected parsing ok? to be %v, but was %v\n", pair.lineIn, pair.lineOutOk, lOk) + } + + if diff := cfg.Compare(l, pair.lineOut); diff != "" { + t.Errorf("For \"%v\": log line diff: (-got +want)\n%s", pair.lineIn, diff) + } + } +} diff --git a/logs/parse.go b/logs/parse.go index 432798e06..85d3e5cd9 100644 --- a/logs/parse.go +++ b/logs/parse.go @@ -1,519 +1,251 @@ package logs import ( + "errors" "fmt" - "io" "regexp" "strconv" "strings" "time" + "unicode/utf8" - "github.com/google/uuid" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" ) -const LogPrefixAmazonRds string = "%t:%r:%u@%d:[%p]:" -const LogPrefixAzure string = "%t-%c-" -const LogPrefixCustom1 string = "%m [%p][%v] : [%l-1] %q[app=%a] " -const LogPrefixCustom2 string = "%t [%p-%l] %q%u@%d " -const LogPrefixCustom3 string = "%m [%p] %q[user=%u,db=%d,app=%a] " -const LogPrefixCustom4 string = "%m [%p] %q[user=%u,db=%d,app=%a,host=%h] " -const LogPrefixCustom5 string = "%t [%p]: [%l-1] user=%u,db=%d - PG-%e " -const LogPrefixCustom6 string = "%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h " -const LogPrefixCustom7 string = "%t [%p]: [%l-1] [trx_id=%x] user=%u,db=%d " -const LogPrefixCustom8 string = "[%p]: [%l-1] db=%d,user=%u " -const LogPrefixCustom9 string = "%m %r %u %a [%c] [%p] " -const LogPrefixCustom10 string = "%m [%p]: [%l-1] db=%d,user=%u " -const LogPrefixCustom11 string = "pid=%p,user=%u,db=%d,app=%a,client=%h " -const LogPrefixCustom12 string = "user=%u,db=%d,app=%a,client=%h " -const LogPrefixCustom13 string = "%p-%s-%c-%l-%h-%u-%d-%m " -const LogPrefixCustom14 string = "%m [%p][%b][%v][%x] %q[user=%u,db=%d,app=%a] " -const LogPrefixCustom15 string = "%m [%p] %q%u@%d " -const LogPrefixCustom16 string = "%t [%p] %q%u@%d %h " -const LogPrefixSimple string = "%m [%p] " -const LogPrefixHeroku1 string = " sql_error_code = %e " -const LogPrefixHeroku2 string = ` sql_error_code = %e time_ms = "%m" pid="%p" proc_start_time="%s" session_id="%c" vtid="%v" tid="%x" log_line="%l" %qdatabase="%d" connection_source="%r" user="%u" application_name="%a" ` - -// Used only to recognize the Heroku hobby tier log_line_prefix to give a warning (logs are not supported -// on hobby tier) and avoid errors during prefix check; logs with this prefix are never actually received -const LogPrefixHerokuHobbyTier string = " database = %d connection_source = %r sql_error_code = %e " -const LogPrefixEmpty string = "" - -var RecommendedPrefixIdx = 4 - -var SupportedPrefixes = []string{ - LogPrefixAmazonRds, LogPrefixAzure, LogPrefixCustom1, LogPrefixCustom2, - LogPrefixCustom3, LogPrefixCustom4, LogPrefixCustom5, LogPrefixCustom6, - LogPrefixCustom7, LogPrefixCustom8, LogPrefixCustom9, LogPrefixCustom10, - LogPrefixCustom11, LogPrefixCustom12, LogPrefixCustom13, LogPrefixCustom14, - LogPrefixCustom15, LogPrefixCustom16, - LogPrefixSimple, LogPrefixHeroku1, LogPrefixHeroku2, LogPrefixEmpty, +type PrefixEscape struct { + Regexp string + ApplyValue func(value string, logLine *state.LogLine, tz *time.Location) + // Indicates a value may not always be present for this escape (e.g., when logging from a non-backend process) + Optional bool } -// Every one of these regexps should produce exactly one matching group -var TimeRegexp = `(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)? [\-+]?\w+)` // %t or %m (or %s) -var HostAndPortRegexp = `(.+(?:\(\d+\))?)?` // %r -var PidRegexp = `(\d+)` // %p -var UserRegexp = `(\S*)` // %u -var DbRegexp = `(\S*)` // %d -var AppBeforeSpaceRegexp = `(\S*)` // %a -var AppBeforeCommaRegexp = `([^,]*)` // %a -var AppBeforeQuoteRegexp = `([^"]*)` // %a -var AppInsideBracketsRegexp = `(\[unknown\]|[^,\]]*)` // %a -var HostRegexp = `(\S*)` // %h -var VirtualTxRegexp = `(\d+/\d+)?` // %v -var LogLineCounterRegexp = `(\d+)` // %l -var SqlstateRegexp = `(\w{5})` // %e -var TransactionIdRegexp = `(\d+)` // %x -var SessionIdRegexp = `(\w+\.\w+)` // %c -var BackendTypeRegexp = `([\w ]+)` // %b -// Missing: -// - %n (unix timestamp) -// - %i (command tag) - -var LevelAndContentRegexp = `(\w+):\s+(.*\n?)$` -var LogPrefixAmazonRdsRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + `:` + HostAndPortRegexp + `:` + UserRegexp + `@` + DbRegexp + `:\[` + PidRegexp + `\]:` + LevelAndContentRegexp) -var LogPrefixAzureRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + `-` + SessionIdRegexp + `-` + LevelAndContentRegexp) -var LogPrefixCustom1Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]\[` + VirtualTxRegexp + `\] : \[` + LogLineCounterRegexp + `-1\] (?:\[app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) -var LogPrefixCustom2Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `-` + LogLineCounterRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` )?` + LevelAndContentRegexp) -var LogPrefixCustom3Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) -var LogPrefixCustom4Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,host=` + HostRegexp + `\] )?` + LevelAndContentRegexp) -var LogPrefixCustom5Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] user=` + UserRegexp + `,db=` + DbRegexp + ` - PG-` + SqlstateRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom6Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom7Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] \[trx_id=` + TransactionIdRegexp + `\] user=` + UserRegexp + `,db=` + DbRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom8Regexp = regexp.MustCompile(`(?s)^\[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] db=` + DbRegexp + `,user=` + UserRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom9Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` ` + HostAndPortRegexp + ` ` + UserRegexp + ` ` + AppBeforeSpaceRegexp + ` \[` + SessionIdRegexp + `\] \[` + PidRegexp + `\] ` + LevelAndContentRegexp) -var LogPrefixCustom10Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]: \[` + LogLineCounterRegexp + `-1\] db=` + DbRegexp + `,user=` + UserRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom11Regexp = regexp.MustCompile(`(?s)^pid=` + PidRegexp + `,user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom12Regexp = regexp.MustCompile(`(?s)^user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppBeforeCommaRegexp + `,client=` + HostRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom13Regexp = regexp.MustCompile(`(?s)^` + PidRegexp + `-` + TimeRegexp + `-` + SessionIdRegexp + `-` + LogLineCounterRegexp + `-` + HostRegexp + `-` + UserRegexp + `-` + DbRegexp + `-` + TimeRegexp + ` ` + LevelAndContentRegexp) -var LogPrefixCustom14Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\]\[` + BackendTypeRegexp + `\]\[` + VirtualTxRegexp + `\]\[` + TransactionIdRegexp + `\] (?:\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] )?` + LevelAndContentRegexp) -var LogPrefixCustom15Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` )?` + LevelAndContentRegexp) -var LogPrefixCustom16Regexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + `(?:` + UserRegexp + `@` + DbRegexp + ` ` + HostRegexp + ` )?` + LevelAndContentRegexp) -var LogPrefixSimpleRegexp = regexp.MustCompile(`(?s)^` + TimeRegexp + ` \[` + PidRegexp + `\] ` + LevelAndContentRegexp) -var LogPrefixNoTimestampUserDatabaseAppRegexp = regexp.MustCompile(`(?s)^\[user=` + UserRegexp + `,db=` + DbRegexp + `,app=` + AppInsideBracketsRegexp + `\] ` + LevelAndContentRegexp) -var LogPrefixHeroku1Regexp = regexp.MustCompile(`^ sql_error_code = ` + SqlstateRegexp + " " + LevelAndContentRegexp) -var LogPrefixHeroku2Regexp = regexp.MustCompile(`^ sql_error_code = ` + SqlstateRegexp + ` time_ms = "` + TimeRegexp + `" pid="` + PidRegexp + `" proc_start_time="` + TimeRegexp + `" session_id="` + SessionIdRegexp + `" vtid="` + VirtualTxRegexp + `" tid="` + TransactionIdRegexp + `" log_line="` + LogLineCounterRegexp + `" (?:database="` + DbRegexp + `" connection_source="` + HostAndPortRegexp + `" user="` + UserRegexp + `" application_name="` + AppBeforeQuoteRegexp + `" )?` + LevelAndContentRegexp) - -var SyslogSequenceAndSplitRegexp = `(\[[\d-]+\])?` - -var RsyslogLevelAndContentRegexp = `(?:(\w+):\s+)?(.*\n?)$` -var RsyslogTimeRegexp = `(\w+\s+\d+ \d{2}:\d{2}:\d{2})` -var RsyslogHostnameRegxp = `(\S+)` -var RsyslogProcessNameRegexp = `(\w+)` -var RsyslogRegexp = regexp.MustCompile(`^` + RsyslogTimeRegexp + ` ` + RsyslogHostnameRegxp + ` ` + RsyslogProcessNameRegexp + `\[` + PidRegexp + `\]: ` + SyslogSequenceAndSplitRegexp + ` ` + RsyslogLevelAndContentRegexp) - -func IsSupportedPrefix(prefix string) bool { - for _, supportedPrefix := range SupportedPrefixes { - if supportedPrefix == prefix { - return true - } - } - return false +// This is a map of the various log_line_prefix format strings; see +// https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-LOG-LINE-PREFIX +// not included: %q and %%, which are easier to handle by special-casing +var EscapeMatchers = map[rune]PrefixEscape{ + // Application name + 'a': { + Regexp: `.+?`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + if value == "[unknown]" { + return + } + logLine.Application = value + }, + Optional: true, + }, + // User name + 'u': { + Regexp: `.+?`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + if value == "[unknown]" { + return + } + logLine.Username = value + }, + Optional: true, + }, + // Database name + 'd': { + Regexp: `.+?`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + if value == "[unknown]" { + return + } + logLine.Database = value + }, + Optional: true, + }, + // Remote host name or IP address, and remote port + 'r': { + Regexp: `[a-zA-Z0-9:.-]+\(\d{1,5}\)|\[local\]`, + Optional: true, + }, + // Remote host name or IP address + 'h': { + Regexp: `[a-zA-Z0-9:.-]+|\[local\]`, + Optional: true, + }, + // Backend type + 'b': { + Regexp: `[a-z ]+`, + }, + // Process ID + 'p': { + Regexp: `\d+`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + intVal, _ := strconv.ParseInt(value, 10, 32) + logLine.BackendPid = int32(intVal) + }, + }, + // Process ID of the parallel group leader, if this process is a parallel query worker + 'P': { + Regexp: `\d+`, + Optional: true, + }, + // Time stamp without milliseconds + 't': { + Regexp: `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} (?:[A-Z]{1,4}|[+-]\d+)`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + logLine.OccurredAt = GetOccurredAt(value, tz, false) + }, + }, + // Time stamp with milliseconds + 'm': { + Regexp: `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} (?:[A-Z]{1,4}|[+-]\d+)`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + logLine.OccurredAt = GetOccurredAt(value, tz, false) + }, + }, + // Time stamp with milliseconds (as a Unix epoch) + 'n': { + Regexp: `\d+\.\d+`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + tsparts := strings.SplitN(value, ".", 2) + seconds, _ := strconv.ParseInt(tsparts[0], 10, 64) + millis, _ := strconv.ParseInt(tsparts[1], 10, 64) + logLine.OccurredAt = time.Unix(seconds, millis*1_000_000) + }, + }, + // Command tag: type of session's current command + 'i': { + Regexp: `[A-Z_ ]+`, + Optional: true, + }, + // SQLSTATE error code + 'e': { + Regexp: `[0-9A-Z]{5}`, + }, + // Session ID: see below + 'c': { + Regexp: `[0-9a-f]{1,8}\.[0-9a-f]{1,8}`, + Optional: true, + }, + // Number of the log line for each session or process, starting at 1 + 'l': { + Regexp: `\d+`, + ApplyValue: func(value string, logLine *state.LogLine, tz *time.Location) { + intVal, _ := strconv.ParseInt(value, 10, 32) + logLine.LogLineNumber = int32(intVal) + }, + }, + // Process start time stamp + 's': { + Regexp: `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} (?:[A-Z]{1,4}|[+-]\d+)`, + }, + // Virtual transaction ID (backendID/localXID); see Section 74.1 + 'v': { + Regexp: `\d+\/\d+`, + Optional: true, + }, + // Transaction ID (0 if none is assigned); see Section 74.1 + 'x': { + Regexp: `\d+`, + Optional: true, + }, + // Query identifier of the current query. Query identifiers are not computed by default, so this field will be zero unless compute_query_id parameter is enabled or a third-party module that computes query identifiers is configured. + 'Q': { + Regexp: `-?\d+`, + }, } -func ParseLogLineWithPrefix(prefix string, line string, tz *time.Location) (logLine state.LogLine, ok bool) { - var timePart, userPart, dbPart, appPart, pidPart, logLineNumberPart, levelPart, contentPart string +type LogParser struct { + prefix string + tz *time.Location + isSyslog bool - rsyslog := false + useLegacyFallback bool - // Only read the first 1000 characters of a log line to parse the log_line_prefix - // - // This reduces the overhead for very long loglines, because we don't pass in the - // whole line to the regexp engine (twice). - lineExtra := "" - if len(line) > 1000 { - lineExtra = line[1000:] - line = line[0:1000] - } + lineRegexp *regexp.Regexp + prefixElements []PrefixEscape +} - if prefix == "" { - if LogPrefixAmazonRdsRegexp.MatchString(line) { - prefix = LogPrefixAmazonRds - } else if LogPrefixAzureRegexp.MatchString(line) { - prefix = LogPrefixAzure - } else if LogPrefixCustom1Regexp.MatchString(line) { - prefix = LogPrefixCustom1 - } else if LogPrefixCustom2Regexp.MatchString(line) { - prefix = LogPrefixCustom2 - } else if LogPrefixCustom4Regexp.MatchString(line) { // 4 is more specific than 3, so needs to go first - prefix = LogPrefixCustom4 - } else if LogPrefixCustom3Regexp.MatchString(line) { - prefix = LogPrefixCustom3 - } else if LogPrefixCustom5Regexp.MatchString(line) { - prefix = LogPrefixCustom5 - } else if LogPrefixCustom6Regexp.MatchString(line) { - prefix = LogPrefixCustom6 - } else if LogPrefixCustom7Regexp.MatchString(line) { - prefix = LogPrefixCustom7 - } else if LogPrefixCustom8Regexp.MatchString(line) { - prefix = LogPrefixCustom8 - } else if LogPrefixCustom9Regexp.MatchString(line) { - prefix = LogPrefixCustom9 - } else if LogPrefixCustom10Regexp.MatchString(line) { - prefix = LogPrefixCustom10 - } else if LogPrefixCustom11Regexp.MatchString(line) { - prefix = LogPrefixCustom11 - } else if LogPrefixCustom12Regexp.MatchString(line) { - prefix = LogPrefixCustom12 - } else if LogPrefixCustom13Regexp.MatchString(line) { - prefix = LogPrefixCustom13 - } else if LogPrefixCustom14Regexp.MatchString(line) { - prefix = LogPrefixCustom14 - } else if LogPrefixCustom15Regexp.MatchString(line) { - prefix = LogPrefixCustom15 - } else if LogPrefixCustom16Regexp.MatchString(line) { - prefix = LogPrefixCustom16 - } else if LogPrefixSimpleRegexp.MatchString(line) { - prefix = LogPrefixSimple - } else if LogPrefixHeroku2Regexp.MatchString(line) { - prefix = LogPrefixHeroku2 - } else if LogPrefixHeroku1Regexp.MatchString(line) { - // LogPrefixHeroku1 is a subset of 2, so it must be matched second - prefix = LogPrefixHeroku1 - } else if RsyslogRegexp.MatchString(line) { - rsyslog = true - } +func NewLogParser(prefix string, tz *time.Location, isSyslog bool, useLegacyFallback bool) *LogParser { + prefixRegexp, prefixElements := parsePrefix(prefix) + lineRegexp := regexp.MustCompile("(?ms)^" + prefixRegexp + `(\w+):\s+(.*\n?)$`) + return &LogParser{ + prefix: prefix, + tz: tz, + isSyslog: isSyslog, + + useLegacyFallback: useLegacyFallback, + + lineRegexp: lineRegexp, + prefixElements: prefixElements, } +} - if rsyslog { - parts := RsyslogRegexp.FindStringSubmatch(line) - if len(parts) == 0 { - return +func getLogConfigFromSettings(settings []state.PostgresSetting) (tz *time.Location, prefix string) { + for _, setting := range settings { + if !setting.ResetValue.Valid { + continue } - timePart = fmt.Sprintf("%d %s", time.Now().Year(), parts[1]) - // ignore syslog hostname - // ignore syslog process name - pidPart = parts[4] - // ignore syslog postgres sequence and split number - levelPart = parts[6] - contentPart = strings.Replace(parts[7], "#011", "\t", -1) - - parts = LogPrefixNoTimestampUserDatabaseAppRegexp.FindStringSubmatch(contentPart) - if len(parts) == 6 { - userPart = parts[1] - dbPart = parts[2] - appPart = parts[3] - levelPart = parts[4] - contentPart = parts[5] - } - } else { - switch prefix { - case LogPrefixAmazonRds: // "%t:%r:%u@%d:[%p]:" - parts := LogPrefixAmazonRdsRegexp.FindStringSubmatch(line) - if len(parts) == 0 { - return + if setting.Name == "log_timezone" { + zoneStr := setting.ResetValue.String + zone, err := time.LoadLocation(zoneStr) + if err == nil { + tz = zone } + } else if setting.Name == "log_line_prefix" { + prefix = setting.ResetValue.String + } + } + return +} - timePart = parts[1] - // skip %r (ip+port) - userPart = parts[3] - dbPart = parts[4] - pidPart = parts[5] - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixAzure: // "%t-%c-" - parts := LogPrefixAzureRegexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } +func SyncLogParser(server *state.Server, settings []state.PostgresSetting) { + server.LogParseMutex.RLock() - timePart = parts[1] - // skip %c (session id) - levelPart = parts[3] - contentPart = parts[4] - case LogPrefixCustom1: // "%m [%p][%v] : [%l-1] %q[app=%a] " - parts := LogPrefixCustom1Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - // skip %v (virtual TX) - logLineNumberPart = parts[4] - appPart = parts[5] - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixCustom2: // "%t [%p-1] %q%u@%d " - parts := LogPrefixCustom2Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - logLineNumberPart = parts[3] - userPart = parts[4] - dbPart = parts[5] - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixCustom3: // "%m [%p] %q[user=%u,db=%d,app=%a] "" - parts := LogPrefixCustom3Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - userPart = parts[3] - dbPart = parts[4] - appPart = parts[5] - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixCustom4: // "%m [%p] %q[user=%u,db=%d,app=%a,host=%h] " - parts := LogPrefixCustom4Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - userPart = parts[3] - dbPart = parts[4] - appPart = parts[5] - // skip %h (host) - levelPart = parts[7] - contentPart = parts[8] - case LogPrefixCustom5: // "%t [%p]: [%l-1] user=%u,db=%d - PG-%e " - parts := LogPrefixCustom5Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - logLineNumberPart = parts[3] - userPart = parts[4] - dbPart = parts[5] - // skip %e (SQLSTATE) - levelPart = parts[7] - contentPart = parts[8] - case LogPrefixCustom6: // "%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h " - parts := LogPrefixCustom6Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - logLineNumberPart = parts[3] - userPart = parts[4] - dbPart = parts[5] - // skip %a (application name) - // skip %h (host) - levelPart = parts[8] - contentPart = parts[9] - case LogPrefixCustom7: // "%t [%p]: [%l-1] [trx_id=%x] user=%u,db=%d " - parts := LogPrefixCustom7Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - logLineNumberPart = parts[3] - // skip %x (transaction id) - userPart = parts[5] - dbPart = parts[6] - levelPart = parts[7] - contentPart = parts[8] - case LogPrefixCustom8: // "[%p]: [%l-1] db=%d,user=%u " - parts := LogPrefixCustom8Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - pidPart = parts[1] - logLineNumberPart = parts[2] - dbPart = parts[3] - userPart = parts[4] - levelPart = parts[5] - contentPart = parts[6] - case LogPrefixCustom9: // "%m %r %u %a [%c] [%p] " - parts := LogPrefixCustom9Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - // skip %r (ip+port) - userPart = parts[3] - appPart = parts[4] - // skip %c (session id) - pidPart = parts[6] - levelPart = parts[7] - contentPart = parts[8] - case LogPrefixCustom10: // "%t [%p]: [%l-1] db=%d,user=%u " - parts := LogPrefixCustom10Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - logLineNumberPart = parts[3] - dbPart = parts[4] - userPart = parts[5] - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixCustom11: // "pid=%p,user=%u,db=%d,app=%a,client=%h " - parts := LogPrefixCustom11Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - pidPart = parts[1] - userPart = parts[2] - dbPart = parts[3] - // skip %a (application name) - // skip %h (host) - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixCustom12: // "user=%u,db=%d,app=%a,client=%h " - parts := LogPrefixCustom12Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - userPart = parts[1] - dbPart = parts[2] - // skip %a (application name) - // skip %h (host) - levelPart = parts[5] - contentPart = parts[6] - case LogPrefixCustom13: // "%p-%s-%c-%l-%h-%u-%d-%m " - parts := LogPrefixCustom13Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - pidPart = parts[1] - // skip %s - // skip %c - logLineNumberPart = parts[4] - // skip %h (host) - userPart = parts[6] - dbPart = parts[7] - timePart = parts[8] - levelPart = parts[9] - contentPart = parts[10] - case LogPrefixCustom14: // "%m [%p][%b][%v][%x] %q[user=%u,db=%d,app=%a] " - parts := LogPrefixCustom14Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } + tz, prefix := getLogConfigFromSettings(settings) + isSyslog := server.Config.LogSyslogServer != "" + useLegacyFallback := server.Config.LogLinePrefix == "legacy" + parserInSync := server.LogParser != nil && server.LogParser.Matches(prefix, tz, isSyslog) + server.LogParseMutex.RUnlock() - timePart = parts[1] - pidPart = parts[2] - // skip %b - // skip %v - // skip %x - userPart = parts[6] - dbPart = parts[7] - appPart = parts[8] - levelPart = parts[9] - contentPart = parts[10] - case LogPrefixCustom15: // "%m [%p] %q%u@%d " - parts := LogPrefixCustom15Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - userPart = parts[3] - dbPart = parts[4] - levelPart = parts[5] - contentPart = parts[6] - case LogPrefixCustom16: // "%t [%p] %q%u@%d %h " - parts := LogPrefixCustom16Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - userPart = parts[3] - dbPart = parts[4] - // skip %h (host) - levelPart = parts[6] - contentPart = parts[7] - case LogPrefixSimple: // "%t [%p] " - parts := LogPrefixSimpleRegexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - timePart = parts[1] - pidPart = parts[2] - levelPart = parts[3] - contentPart = parts[4] - case LogPrefixHeroku1: - parts := LogPrefixHeroku1Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - // skip %e - levelPart = parts[2] - contentPart = parts[3] - case LogPrefixHeroku2: - parts := LogPrefixHeroku2Regexp.FindStringSubmatch(line) - if len(parts) == 0 { - return - } - // skip %e - timePart = parts[2] - pidPart = parts[3] - // skip %s - // skip %c - // skip %v - // skip %x - logLineNumberPart = parts[8] - dbPart = parts[9] - // skip %r - userPart = parts[11] - appPart = parts[12] - levelPart = parts[13] - contentPart = parts[14] - default: - // Some callers use the content of unparsed lines to stitch multi-line logs together - logLine.Content = line + lineExtra - return - } + if parserInSync { + return } - if timePart != "" { - occurredAt := GetOccurredAt(timePart, tz, rsyslog) - if occurredAt.IsZero() { - return - } - logLine.OccurredAt = occurredAt - } + server.LogParseMutex.Lock() + defer server.LogParseMutex.Unlock() - if userPart != "[unknown]" { - logLine.Username = userPart - } - if dbPart != "[unknown]" { - logLine.Database = dbPart - } - if appPart != "[unknown]" { - logLine.Application = appPart + server.LogParser = NewLogParser(prefix, tz, isSyslog, useLegacyFallback) +} + +func (lp *LogParser) ValidatePrefix() error { + dbInPrefix, err := regexp.MatchString("(?:^|[^%])%d", lp.prefix) + if err != nil { + return fmt.Errorf("could not check: %s", err) } - if logLineNumberPart != "" { - logLineNumber, _ := strconv.ParseInt(logLineNumberPart, 10, 32) - logLine.LogLineNumber = int32(logLineNumber) + userInPrefix, err := regexp.MatchString("(?:^|[^%])%u", lp.prefix) + if err != nil { + return fmt.Errorf("could not check: %s", err) } - - backendPid, _ := strconv.ParseInt(pidPart, 10, 32) - logLine.BackendPid = int32(backendPid) - logLine.Content = contentPart + lineExtra - - // This is actually a continuation of a previous line - if levelPart == "" { - return + if !dbInPrefix && !userInPrefix { + return errors.New("database (%d) and user (%u) not found: pganalyze will not be able to correctly classify some log lines") + } else if !dbInPrefix { + return errors.New("database (%d) not found: pganalyze will not be able to correctly classify some log lines") + } else if !userInPrefix { + return errors.New("user (%u) not found: pganalyze will not be able to correctly classify some log lines") + } else { + return nil } +} - logLine.LogLevel = pganalyze_collector.LogLineInformation_LogLevel(pganalyze_collector.LogLineInformation_LogLevel_value[levelPart]) - ok = true - - return +func (lp *LogParser) Matches(prefix string, tz *time.Location, isSyslog bool) bool { + return lp.prefix == prefix && tz.String() == lp.tz.String() && lp.isSyslog == isSyslog } -func GetOccurredAt(timePart string, tz *time.Location, rsyslog bool) time.Time { - if tz != nil && !rsyslog { +func (lp *LogParser) GetOccurredAt(timePart string) time.Time { + if lp.tz != nil && !lp.isSyslog { lastSpaceIdx := strings.LastIndex(timePart, " ") if lastSpaceIdx == -1 { return time.Time{} } timePartNoTz := timePart[0:lastSpaceIdx] - result, err := time.ParseInLocation("2006-01-02 15:04:05", timePartNoTz, tz) + result, err := time.ParseInLocation("2006-01-02 15:04:05", timePartNoTz, lp.tz) if err != nil { return time.Time{} } @@ -523,7 +255,7 @@ func GetOccurredAt(timePart string, tz *time.Location, rsyslog bool) time.Time { // Assume Postgres time format unless overriden by the prefix (e.g. syslog) var timeFormat, timeFormatAlt string - if rsyslog { + if lp.isSyslog { timeFormat = "2006 Jan 2 15:04:05" timeFormatAlt = "" } else { @@ -569,66 +301,149 @@ func GetOccurredAt(timePart string, tz *time.Location, rsyslog bool) time.Time { return ts } -type LineReader interface { - ReadString(delim byte) (string, error) +func parseSyslogLine(line string, tz *time.Location) (logLine state.LogLine, ok bool) { + parts := RsyslogRegexp.FindStringSubmatch(line) + if len(parts) == 0 { + return + } + + timePart := fmt.Sprintf("%d %s", time.Now().Year(), parts[1]) + // ignore syslog hostname + // ignore syslog process name + pidPart := parts[4] + // ignore syslog postgres sequence and split number + levelPart := parts[6] + contentPart := strings.Replace(parts[7], "#011", "\t", -1) + + parts = LogPrefixNoTimestampUserDatabaseAppRegexp.FindStringSubmatch(contentPart) + if len(parts) == 6 { + userPart := parts[1] + dbPart := parts[2] + appPart := parts[3] + levelPart = parts[4] + contentPart = parts[5] + + if userPart != "[unknown]" { + logLine.Username = userPart + } + if dbPart != "[unknown]" { + logLine.Database = dbPart + } + if appPart != "[unknown]" { + logLine.Application = appPart + } + } + + occurredAt := GetOccurredAt(timePart, tz, true) + if occurredAt.IsZero() { + return + } + + logLine.OccurredAt = occurredAt + + backendPid, _ := strconv.ParseInt(pidPart, 10, 32) + logLine.BackendPid = int32(backendPid) + logLine.Content = contentPart + + // This is actually a continuation of a previous line + if levelPart == "" { + return + } + + logLine.LogLevel = pganalyze_collector.LogLineInformation_LogLevel(pganalyze_collector.LogLineInformation_LogLevel_value[levelPart]) + ok = true + + return } -func ParseAndAnalyzeBuffer(logStream LineReader, linesNewerThan time.Time, server *state.Server) ([]state.LogLine, []state.PostgresQuerySample) { - var logLines []state.LogLine - var currentByteStart int64 = 0 - var tz = server.GetLogTimezone() +func (lp *LogParser) ParseLine(line string) (logLine state.LogLine, ok bool) { + if lp.useLegacyFallback { + return ParseLogLineWithPrefix("", line, lp.tz) + } + if lp.isSyslog { + return parseSyslogLine(line, lp.tz) + } - for { - line, err := logStream.ReadString('\n') - byteStart := currentByteStart - currentByteStart += int64(len(line)) + if lp.prefix == "" { + return logLine, false + } - // This is intentionally after updating currentByteStart, since we consume the - // data in the file even if an error is returned - if err != nil { - if err != io.EOF { - fmt.Printf("Log Read ERROR: %s", err) - } - break + lineValues := lp.lineRegexp.FindStringSubmatch(line) + + if lineValues == nil { + // If this is an unprefixed line, it may be a continuation of a previous line + logLine.Content = line + return logLine, false + } + + for i, elem := range lp.prefixElements { + if elem.ApplyValue != nil { + value := lineValues[i+1] + elem.ApplyValue(value, &logLine, lp.tz) } + } - logLine, ok := ParseLogLineWithPrefix("", line, tz) - if !ok { - // Assume that a parsing error in a follow-on line means that we actually - // got additional data for the previous line - if len(logLines) > 0 && logLine.Content != "" { - logLines[len(logLines)-1].Content += logLine.Content - logLines[len(logLines)-1].ByteEnd += int64(len(logLine.Content)) - } + levelPart := lineValues[len(lineValues)-2] + logLine.Content = lineValues[len(lineValues)-1] + logLine.LogLevel = pganalyze_collector.LogLineInformation_LogLevel(pganalyze_collector.LogLineInformation_LogLevel_value[levelPart]) + + return logLine, true +} + +func parsePrefix(prefix string) (string, []PrefixEscape) { + var escapes []PrefixEscape + var resultRegexp strings.Builder + // for when %q is used + var pastq = false + + prefixLen := len(prefix) + var runeValue rune + for byteIdx, width := 0, 0; byteIdx < prefixLen; byteIdx += width { + runeValue, width = utf8.DecodeRuneInString(prefix[byteIdx:]) + if runeValue != '%' || byteIdx == prefixLen-1 { + // keep in regexp to match as a literal, but ignore + resultRegexp.WriteString(regexp.QuoteMeta(string(runeValue))) continue } - // Ignore loglines which are outside our time window - if logLine.OccurredAt.Before(linesNewerThan) { + // at this point we have an escape to handle; check the actual escape code + // value first + byteIdx += width + runeValue, width = utf8.DecodeRuneInString(prefix[byteIdx:]) + if runeValue == '%' { + // if we see another %, it's escaped so we should expect it in the string + resultRegexp.WriteRune('%') continue } - // Ignore loglines that are ignored server-wide (e.g. because they are - // log_statement=all/log_duration=on lines). Note this intentionally - // runs after multi-line log lines have been stitched together. - if server.IgnoreLogLine(logLine.Content) { + // flag %q if necessary: we wrap the rest of the expression until the end of + // the log_line_prefix in an optional non-capturing group + if !pastq && runeValue == 'q' { + pastq = true + resultRegexp.WriteString("(?:") continue } - logLine.ByteStart = byteStart - logLine.ByteContentStart = byteStart + int64(len(line)-len(logLine.Content)) - logLine.ByteEnd = byteStart + int64(len(line)) - - // Generate unique ID that can be used to reference this line - logLine.UUID, err = uuid.NewV7() - if err != nil { - fmt.Printf("Failed to generate log line UUID: %s", err) + escape, ok := EscapeMatchers[runeValue] + if !ok { + // escapes that don't correspond to known escape codes are ignored continue } - logLines = append(logLines, logLine) + escapes = append(escapes, escape) + resultRegexp.WriteString("(") + resultRegexp.WriteString(escape.Regexp) + resultRegexp.WriteString(")") + if escape.Optional { + resultRegexp.WriteString("?") + } + // TODO: some groups may be empty for some backend types; add a '?' to the + // regexp for those cases? + } + + if pastq { + resultRegexp.WriteString(")?") } - newLogLines, newSamples := AnalyzeLogLines(logLines) - return newLogLines, newSamples + return resultRegexp.String(), escapes } diff --git a/logs/parse_test.go b/logs/parse_test.go index 6e5d66cb0..4cf183d26 100644 --- a/logs/parse_test.go +++ b/logs/parse_test.go @@ -10,25 +10,7 @@ import ( "github.com/pganalyze/collector/state" ) -type parseTestpair struct { - prefixIn string - lineIn string - lineInTz *time.Location - lineOut state.LogLine - lineOutOk bool -} - -func mustTimeLocation(tzStr string) *time.Location { - tz, err := time.LoadLocation(tzStr) - if err != nil { - panic(err) - } - return tz -} - -var BSTTimeLocation = mustTimeLocation("Europe/London") - -var parseTests = []parseTestpair{ +var parse2Tests = []parseTestpair{ // rsyslog format { "", @@ -70,7 +52,7 @@ var parseTests = []parseTestpair{ }, // Amazon RDS format { - "", + logs.LogPrefixAmazonRds, "2018-08-22 16:00:04 UTC:ec2-1-1-1-1.compute-1.amazonaws.com(48808):myuser@mydb:[18762]:LOG: duration: 3668.685 ms execute : SELECT 1", nil, state.LogLine{ @@ -84,7 +66,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixAmazonRds, "2018-08-22 16:00:03 UTC:127.0.0.1(36404):myuser@mydb:[21495]:LOG: duration: 1630.946 ms execute 3: SELECT 1", nil, state.LogLine{ @@ -98,7 +80,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixAmazonRds, "2018-08-22 16:00:03 UTC:[local]:myuser@mydb:[21495]:LOG: duration: 1630.946 ms execute 3: SELECT 1", nil, state.LogLine{ @@ -113,7 +95,7 @@ var parseTests = []parseTestpair{ }, // Azure format { - "", + logs.LogPrefixAzure, "2020-06-21 22:37:10 UTC-5eefe116.22f4-LOG: could not receive data from client: An existing connection was forcibly closed by the remote host.", nil, state.LogLine{ @@ -125,7 +107,7 @@ var parseTests = []parseTestpair{ }, // Custom 1 format { - "", + logs.LogPrefixCustom1, "2018-09-27 06:57:01.030 EST [20194][] : [1-1] [app=pganalyze_collector] LOG: connection received: host=[local]", nil, state.LogLine{ @@ -138,9 +120,26 @@ var parseTests = []parseTestpair{ }, true, }, + // Custom 2 format + { + logs.LogPrefixCustom2, + "2018-09-28 07:37:59 UTC [331-1] postgres@postgres LOG: connection received: host=[local]", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 28, 7, 37, 59, 0, time.UTC), + Username: "postgres", + Database: "postgres", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 331, + LogLineNumber: 1, + Application: "", + Content: "connection received: host=[local]", + }, + true, + }, // Custom 3 format { - "", + logs.LogPrefixCustom3, "2018-09-27 06:57:01.030 UTC [20194] [user=[unknown],db=[unknown],app=[unknown]] LOG: connection received: host=[local]", nil, state.LogLine{ @@ -152,7 +151,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom3, "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql] ERROR: canceling statement due to user request", nil, state.LogLine{ @@ -167,7 +166,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom3, "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql] LOG: duration: 3000.019 ms statement: SELECT pg_sleep(3\n);", nil, state.LogLine{ @@ -181,9 +180,36 @@ var parseTests = []parseTestpair{ }, true, }, + { + logs.LogPrefixCustom3, + "2018-09-27 06:57:01.030 BST [20194] [user=[unknown],db=[unknown],app=[unknown]] LOG: connection received: host=[local]", + BSTTimeLocation, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, BSTTimeLocation), + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "connection received: host=[local]", + }, + true, + }, + { + logs.LogPrefixCustom3, + "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=sidekiq 1.2.3 queues:something[0 of 50 busy]] LOG: duration: 3000.019 ms statement: SELECT pg_sleep(3);", + nil, + state.LogLine{ + OccurredAt: time.Date(2018, time.September, 27, 6, 57, 2, 779*1000*1000, time.UTC), + Username: "postgres", + Database: "postgres", + Application: "sidekiq 1.2.3 queues:something[0 of 50 busy]", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + BackendPid: 20194, + Content: "duration: 3000.019 ms statement: SELECT pg_sleep(3);", + }, + true, + }, // Custom 4 format { - "", + logs.LogPrefixCustom4, "2018-09-27 06:57:01.030 UTC [20194] [user=[unknown],db=[unknown],app=[unknown],host=[local]] LOG: connection received: host=[local]", nil, state.LogLine{ @@ -195,7 +221,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom4, "2018-09-27 06:57:02.779 UTC [20194] [user=postgres,db=postgres,app=psql,host=127.0.0.1] ERROR: canceling statement due to user request", nil, state.LogLine{ @@ -211,7 +237,7 @@ var parseTests = []parseTestpair{ }, // Custom 5 format { - "", + logs.LogPrefixCustom5, "2018-09-28 07:37:59 UTC [331]: [1-1] user=[unknown],db=[unknown] - PG-00000 LOG: connection received: host=127.0.0.1 port=49738", nil, state.LogLine{ @@ -224,7 +250,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom5, "2018-09-28 07:39:48 UTC [347]: [3-1] user=postgres,db=postgres - PG-57014 ERROR: canceling statement due to user request", nil, state.LogLine{ @@ -240,7 +266,7 @@ var parseTests = []parseTestpair{ }, // Custom 6 format { - "", + logs.LogPrefixCustom6, "2018-10-16 01:25:58 UTC [93897]: [4-1] user=,db=,app=,client= LOG: database system is ready to accept connections", nil, state.LogLine{ @@ -253,7 +279,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom6, "2018-10-16 01:26:09 UTC [93907]: [1-1] user=[unknown],db=[unknown],app=[unknown],client=::1 LOG: connection received: host=::1 port=61349", nil, state.LogLine{ @@ -266,13 +292,17 @@ var parseTests = []parseTestpair{ true, }, { - "", + // Updating this test to reflect that %a (application name) is now captured. + // In the original parsing logic (see parse.go:299), we explicitly skip + // setting %a even though it's captured by the prefix. + logs.LogPrefixCustom6, "2018-10-16 01:26:33 UTC [93911]: [3-1] user=postgres,db=postgres,app=psql,client=::1 ERROR: canceling statement due to user request", nil, state.LogLine{ OccurredAt: time.Date(2018, time.October, 16, 1, 26, 33, 0, time.UTC), Username: "postgres", Database: "postgres", + Application: "psql", LogLevel: pganalyze_collector.LogLineInformation_ERROR, BackendPid: 93911, LogLineNumber: 3, @@ -282,7 +312,7 @@ var parseTests = []parseTestpair{ }, // Custom 7 format { - "", + logs.LogPrefixCustom7, "2019-01-01 01:59:42 UTC [1]: [4-1] [trx_id=0] user=,db= LOG: database system is ready to accept connections", nil, state.LogLine{ @@ -295,7 +325,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom7, "2019-01-01 02:00:28 UTC [35]: [1-1] [trx_id=0] user=[unknown],db=[unknown] LOG: connection received: host=::1 port=38842", nil, state.LogLine{ @@ -308,7 +338,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom7, "2019-01-01 02:00:28 UTC [34]: [3-1] [trx_id=120950] user=postgres,db=postgres ERROR: canceling statement due to user request", nil, state.LogLine{ @@ -324,7 +354,7 @@ var parseTests = []parseTestpair{ }, // Custom 8 format { - "", + logs.LogPrefixCustom8, "[1127]: [8-1] db=postgres,user=pganalyze LOG: duration: 2001.842 ms statement: SELECT pg_sleep(2);", nil, state.LogLine{ @@ -340,7 +370,7 @@ var parseTests = []parseTestpair{ }, // Custom 9 format { - "", + logs.LogPrefixCustom9, "2020-05-21 17:53:05.307 UTC [5ec6bfff.1] [1] LOG: database system is ready to accept connections", nil, state.LogLine{ @@ -352,7 +382,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom9, "2020-05-21 17:54:35.256 UTC 172.18.0.1(56402) pgaweb [unknown] [5ec6c05b.22] [34] LOG: connection authorized: user=pgaweb database=pgaweb application_name=psql", nil, state.LogLine{ @@ -365,7 +395,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom9, "2020-05-21 17:54:43.808 UTC 172.18.0.1(56402) pgaweb psql [5ec6c05b.22] [34] LOG: disconnection: session time: 0:00:08.574 user=pgaweb database=pgaweb host=172.18.0.1 port=56402", nil, state.LogLine{ @@ -380,7 +410,7 @@ var parseTests = []parseTestpair{ }, // Custom 10 format { - "", + logs.LogPrefixCustom10, "2020-09-04 16:03:11.375 UTC [417880]: [1-1] db=mydb,user=myuser LOG: pganalyze-collector-identify: myserver", nil, state.LogLine{ @@ -396,7 +426,7 @@ var parseTests = []parseTestpair{ }, // Custom 11 format { - "", + logs.LogPrefixCustom11, "pid=8284,user=[unknown],db=[unknown],app=[unknown],client=[local] LOG: connection received: host=[local]", nil, state.LogLine{ @@ -407,19 +437,22 @@ var parseTests = []parseTestpair{ true, }, { - "", + // Updating this to reflect that we can now capture the unusual application + // name (previously Application was omitted in the expected output struct) + logs.LogPrefixCustom11, "pid=8284,user=[unknown],db=[unknown],app=why would you[] name your application this,client=[local] LOG: connection received: host=[local]", nil, state.LogLine{ - BackendPid: 8284, - LogLevel: pganalyze_collector.LogLineInformation_LOG, - Content: "connection received: host=[local]", + Application: "why would you[] name your application this", + BackendPid: 8284, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Content: "connection received: host=[local]", }, true, }, // Custom 12 format { - "", + logs.LogPrefixCustom12, "user=[unknown],db=[unknown],app=[unknown],client=[local] LOG: connection received: host=[local]", nil, state.LogLine{ @@ -430,7 +463,7 @@ var parseTests = []parseTestpair{ }, // Custom 13 format { - "", + logs.LogPrefixCustom13, "27-2021-11-17 19:06:14 UTC-619552a6.1b-1----2021-11-17 19:06:14.946 UTC LOG: database system was shut down at 2021-11-17 19:01:42 UTC", nil, state.LogLine{ @@ -443,7 +476,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom13, "51-2021-11-17 19:11:13 UTC-619553d1.33-2-172.20.0.1-pgaweb-pgaweb-2021-11-17 19:11:13.562 UTC LOG: connection authorized: user=pgaweb database=pgaweb application_name=puma: cluster worker 2: 18544 [pganalyze]", nil, state.LogLine{ @@ -459,7 +492,7 @@ var parseTests = []parseTestpair{ }, // Custom 14 format { - "", + logs.LogPrefixCustom14, "2021-11-17 19:06:53.897 UTC [34][autovacuum worker][3/5][22996] LOG: automatic analyze of table \"mydb.pg_catalog.pg_class\" system usage: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.01 s", nil, state.LogLine{ @@ -472,7 +505,7 @@ var parseTests = []parseTestpair{ }, // Custom 15 format { - "", + logs.LogPrefixCustom15, "2022-07-22 06:13:13.389 UTC [1] LOG: database system is ready to accept connections", nil, state.LogLine{ @@ -484,7 +517,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom15, "2022-07-22 06:13:45.781 UTC [75] myuser@mydb LOG: connection authorized: user=myuser database=mydb application_name=psql", nil, state.LogLine{ @@ -499,7 +532,7 @@ var parseTests = []parseTestpair{ }, // Custom 16 format { - "", + logs.LogPrefixCustom16, "2022-07-22 06:13:12 UTC [1] LOG: starting PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit", nil, state.LogLine{ @@ -511,7 +544,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixCustom16, "2022-07-22 06:14:23 UTC [76] my-user@my-db 1.2.3.4 LOG: disconnection: session time: 0:00:01.667 user=my-user database=my-db host=1.2.3.4 port=5678", nil, state.LogLine{ @@ -526,7 +559,7 @@ var parseTests = []parseTestpair{ }, // Simple format { - "", + logs.LogPrefixSimple, "2018-05-04 03:06:18.360 UTC [3184] LOG: pganalyze-collector-identify: server1", nil, state.LogLine{ @@ -538,7 +571,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixSimple, "2018-05-04 03:06:18.360 +0100 [3184] LOG: pganalyze-collector-identify: server1", nil, state.LogLine{ @@ -550,7 +583,20 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixSimple, + "2022-12-23 09:53:43.862 -03 [790081] LOG: pganalyze-collector-identify: server1", + nil, + state.LogLine{ + OccurredAt: time.Date(2022, time.December, 23, 9, 53, 43, 862*1000*1000, time.FixedZone("-03", -3*3600)), + LogLevel: 6, + BackendPid: 790081, + Content: "pganalyze-collector-identify: server1", + }, + true, + }, + // Heroku format + { + logs.LogPrefixHeroku1, ` sql_error_code = 28000 FATAL: no pg_hba.conf entry for host "127.0.0.1", user "postgres", database "postgres", SSL off`, nil, state.LogLine{ @@ -560,7 +606,7 @@ var parseTests = []parseTestpair{ true, }, { - "", + logs.LogPrefixHeroku2, ` sql_error_code = 28000 time_ms = "2022-06-02 22:48:20.807 UTC" pid="11666" proc_start_time="2022-06-02 22:48:20 UTC" session_id="62993e34.2d92" vtid="6/17007" tid="0" log_line="1" database="postgres" connection_source="127.0.0.1(36532)" user="postgres" application_name="[unknown]" FATAL: no pg_hba.conf entry for host "127.0.0.1", user "postgres", database "postgres", SSL off`, nil, state.LogLine{ @@ -574,36 +620,16 @@ var parseTests = []parseTestpair{ }, true, }, - { - "", - "2022-12-23 09:53:43.862 -03 [790081] LOG: pganalyze-collector-identify: server1", - nil, - state.LogLine{ - OccurredAt: time.Date(2022, time.December, 23, 9, 53, 43, 862*1000*1000, time.FixedZone("-03", -3*3600)), - LogLevel: 6, - BackendPid: 790081, - Content: "pganalyze-collector-identify: server1", - }, - true, - }, - // Custom 3 format with explicit time zone - { - "", - "2018-09-27 06:57:01.030 BST [20194] [user=[unknown],db=[unknown],app=[unknown]] LOG: connection received: host=[local]", - BSTTimeLocation, - state.LogLine{ - OccurredAt: time.Date(2018, time.September, 27, 6, 57, 1, 30*1000*1000, BSTTimeLocation), - LogLevel: pganalyze_collector.LogLineInformation_LOG, - BackendPid: 20194, - Content: "connection received: host=[local]", - }, - true, - }, } -func TestParseLogLineWithPrefix(t *testing.T) { - for _, pair := range parseTests { - l, lOk := logs.ParseLogLineWithPrefix(pair.prefixIn, pair.lineIn, pair.lineInTz) +func TestLogParser(t *testing.T) { + for _, pair := range parse2Tests { + // Syslog format has a separate, fixed prefix, so the prefix argument is + // ignored by the parser in that case. We use an empty string to indicate + // that this is a syslog test case. + isSyslog := pair.prefixIn == "" + parser := logs.NewLogParser(pair.prefixIn, pair.lineInTz, isSyslog, false) + l, lOk := parser.ParseLine(pair.lineIn) cfg := pretty.CompareConfig cfg.SkipZeroFields = true diff --git a/logs/replace_test.go b/logs/replace_test.go index 08e47cc10..ee43caa98 100644 --- a/logs/replace_test.go +++ b/logs/replace_test.go @@ -64,7 +64,8 @@ var replaceTests = []replaceTestpair{ func TestReplaceSecrets(t *testing.T) { for _, pair := range replaceTests { reader := bufio.NewReader(strings.NewReader(pair.input)) - logLines, _ := logs.ParseAndAnalyzeBuffer(reader, time.Time{}, &state.Server{LogTimezoneMutex: &sync.Mutex{}}) + server := &state.Server{LogParseMutex: &sync.RWMutex{}, LogParser: logs.NewLogParser(logs.LogPrefixAmazonRds, nil, false, false)} + logLines, _ := logs.ParseAndAnalyzeBuffer(reader, time.Time{}, server) output := logs.ReplaceSecrets([]byte(pair.input), logLines, state.ParseFilterLogSecret(pair.filterLogSecret)) cfg := pretty.CompareConfig diff --git a/main.go b/main.go index 41d1079ee..505b50852 100644 --- a/main.go +++ b/main.go @@ -619,9 +619,11 @@ func checkOneInitialCollectionStatus(ctx context.Context, server *state.Server, logger.PrintInfo("Log statement lines will be ignored for this server: %s", logsDisabledReason) } - server.SetLogTimezone(settings) - if server.LogTimezone == nil { - logger.PrintWarning("Could not determine log timezone for this server: %s") + logs.SyncLogParser(server, settings) + parser := server.GetLogParser() + prefixErr := parser.ValidatePrefix() + if prefixErr != nil { + logger.PrintWarning("Checking log_line_prefix: %s", prefixErr) } server.CollectionStatusMutex.Lock() diff --git a/state/state.go b/state/state.go index 1498bb265..c74634f42 100644 --- a/state/state.go +++ b/state/state.go @@ -278,10 +278,11 @@ type Server struct { SelfTest *SelfTestResult - // The time zone that logs are parsed in, synced from the setting log_timezone - // The StateMutex should be held while updating this - LogTimezone *time.Location - LogTimezoneMutex *sync.Mutex + // The LogParser for this server, updated as necessary whenever relevant + // settings (log_line_prefix and log_timezone) change + // The LogSettingsMutex should be held while updating this + LogParser LogParser + LogParseMutex *sync.RWMutex // Boolean flags for which log lines should be ignored for processing // @@ -302,7 +303,7 @@ func MakeServer(config config.ServerConfig, testRun bool) *Server { LogStateMutex: &sync.Mutex{}, ActivityStateMutex: &sync.Mutex{}, CollectionStatusMutex: &sync.Mutex{}, - LogTimezoneMutex: &sync.Mutex{}, + LogParseMutex: &sync.RWMutex{}, } if testRun { server.SelfTest = MakeSelfTest() @@ -315,6 +316,13 @@ const ( LOG_IGNORE_DURATION ) +type LogParser interface { + Matches(prefix string, tz *time.Location, isSyslog bool) bool + GetOccurredAt(timePart string) time.Time + ParseLine(line string) (logLine LogLine, ok bool) + ValidatePrefix() error +} + func (s *Server) SetLogIgnoreFlags(ignoreStatement bool, ignoreDuration bool) { var newFlags uint32 if ignoreStatement { @@ -326,22 +334,11 @@ func (s *Server) SetLogIgnoreFlags(ignoreStatement bool, ignoreDuration bool) { atomic.StoreUint32(&s.LogIgnoreFlags, newFlags) } -func (s *Server) SetLogTimezone(settings []PostgresSetting) { - tz := getTimeZoneFromSettings(settings) +func (s *Server) GetLogParser() LogParser { + s.LogParseMutex.RLock() + defer s.LogParseMutex.RUnlock() - s.LogTimezoneMutex.Lock() - defer s.LogTimezoneMutex.Unlock() - s.LogTimezone = tz -} - -func (s *Server) GetLogTimezone() *time.Location { - s.LogTimezoneMutex.Lock() - defer s.LogTimezoneMutex.Unlock() - if s.LogTimezone == nil { - return nil - } - tz := *s.LogTimezone - return &tz + return s.LogParser } // IgnoreLogLine - helper function that lets callers determine whether a log diff --git a/state/util.go b/state/util.go index 76faa4ae6..0f9f34b29 100644 --- a/state/util.go +++ b/state/util.go @@ -1,7 +1,5 @@ package state -import "time" - type OidToIdxMap map[Oid](map[Oid]int32) func MakeOidToIdxMap() OidToIdxMap { @@ -40,22 +38,3 @@ func XidToXid8(xid Xid, currentXactId Xid8) Xid8 { xidEpoch := int32((currentXactId - Xid8(xid)) >> 32) return Xid8(xidEpoch)<<32 | Xid8(xid) } - -func getTimeZoneFromSettings(settings []PostgresSetting) *time.Location { - for _, setting := range settings { - if setting.Name != "log_timezone" { - continue - } - if !setting.ResetValue.Valid { - return nil - } - - zoneStr := setting.ResetValue.String - zone, err := time.LoadLocation(zoneStr) - if err != nil { - return nil - } - return zone - } - return nil -}