diff --git a/Makefile b/Makefile index 57894791f..f3ede136e 100644 --- a/Makefile +++ b/Makefile @@ -32,8 +32,13 @@ vendor: modvendor -copy="**/*.c **/*.h **/*.proto" -v test: build - go test -coverprofile=coverage.out ./... - # go tool cover -html=coverage.out + go test -race -coverprofile=coverage.out ./... + +coverage: test + go tool cover -html=coverage.out + +run: build + go run -race . integration_test: make -C integration_test diff --git a/input/full.go b/input/full.go index ff6f24a2a..7cb5fbb00 100644 --- a/input/full.go +++ b/input/full.go @@ -83,7 +83,7 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB, } ps.StatementResetCounter = server.PrevState.StatementResetCounter + 1 - if server.Grant.Config.Features.StatementResetFrequency != 0 && ps.StatementResetCounter >= server.Grant.Config.Features.StatementResetFrequency { + if server.Grant.Config.Features.StatementResetFrequency != 0 && ps.StatementResetCounter >= int(server.Grant.Config.Features.StatementResetFrequency) { ps.StatementResetCounter = 0 err = postgres.ResetStatements(ctx, logger, connection, systemType) if err != nil { diff --git a/input/postgres/schema.go b/input/postgres/schema.go index ee1e7c7c4..14e851264 100644 --- a/input/postgres/schema.go +++ b/input/postgres/schema.go @@ -81,7 +81,7 @@ func CollectAllSchemas(ctx context.Context, server *state.Server, collectionOpts ts.DatabaseOidsWithLocalCatalog = append(ts.DatabaseOidsWithLocalCatalog, databaseOid) server.SelfTest.MarkDbCollectionAspectOk(dbName, state.CollectionAspectSchema) } - schemaTableLimit := server.Grant.Config.SchemaTableLimit + schemaTableLimit := int(server.Grant.Config.SchemaTableLimit) if schemaTableLimit == 0 { schemaTableLimit = defaultSchemaTableLimit } diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index 794568f60..50bb14bdf 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -25,7 +25,7 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg return fmt.Sprintf( "%s/databases/%s/queries/%s/samples/%d?role=%s", - grant.Config.ServerURL, + grant.Config.ServerUrl, sample.Database, hex.EncodeToString(fpBin), sample.OccurredAt.Unix(), diff --git a/main.go b/main.go index 41d1079ee..295346e45 100644 --- a/main.go +++ b/main.go @@ -255,6 +255,8 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col wg.Done() }, logger, "high frequency query statistics of all servers", schedulerGroups["stats"]) + runner.SetupWebsocketForAllServers(ctx, servers, globalCollectionOpts, logger) + keepRunning = true return } diff --git a/output/compact.go b/output/compact.go index c804b32a0..351051cba 100644 --- a/output/compact.go +++ b/output/compact.go @@ -58,6 +58,11 @@ func uploadAndSubmitCompactSnapshot(ctx context.Context, s pganalyze_collector.C return nil } + if server.WebSocket != nil { + server.SnapshotStream <- compressedData.Bytes() + return nil + } + s3Location, err := uploadSnapshot(ctx, server.Config.HTTPClientWithRetry, grant, logger, compressedData, snapshotUUID.String()) if err != nil { logger.PrintError("Error uploading snapshot: %s", err) diff --git a/output/full.go b/output/full.go index 5835c5af9..dbbc9a9cb 100644 --- a/output/full.go +++ b/output/full.go @@ -82,6 +82,11 @@ func submitFull(ctx context.Context, s snapshot.FullSnapshot, server *state.Serv return nil } + if server.WebSocket != nil { + server.SnapshotStream <- compressedData.Bytes() + return nil + } + s3Location, err := uploadSnapshot(ctx, server.Config.HTTPClientWithRetry, server.Grant, logger, compressedData, snapshotUUID.String()) if err != nil { logger.PrintError("Error uploading snapshot: %s", err) diff --git a/output/pganalyze_collector/server_message.pb.go b/output/pganalyze_collector/server_message.pb.go new file mode 100644 index 000000000..d9637467b --- /dev/null +++ b/output/pganalyze_collector/server_message.pb.go @@ -0,0 +1,604 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.14.0 +// source: server_message.proto + +package pganalyze_collector + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ServerMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // + // *ServerMessage_Config_ + // *ServerMessage_Pause_ + // *ServerMessage_ExplainRun_ + Message isServerMessage_Message `protobuf_oneof:"message"` +} + +func (x *ServerMessage) Reset() { + *x = ServerMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_server_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage) ProtoMessage() {} + +func (x *ServerMessage) ProtoReflect() protoreflect.Message { + mi := &file_server_message_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage.ProtoReflect.Descriptor instead. +func (*ServerMessage) Descriptor() ([]byte, []int) { + return file_server_message_proto_rawDescGZIP(), []int{0} +} + +func (m *ServerMessage) GetMessage() isServerMessage_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *ServerMessage) GetConfig() *ServerMessage_Config { + if x, ok := x.GetMessage().(*ServerMessage_Config_); ok { + return x.Config + } + return nil +} + +func (x *ServerMessage) GetPause() *ServerMessage_Pause { + if x, ok := x.GetMessage().(*ServerMessage_Pause_); ok { + return x.Pause + } + return nil +} + +func (x *ServerMessage) GetExplainRun() *ServerMessage_ExplainRun { + if x, ok := x.GetMessage().(*ServerMessage_ExplainRun_); ok { + return x.ExplainRun + } + return nil +} + +type isServerMessage_Message interface { + isServerMessage_Message() +} + +type ServerMessage_Config_ struct { + // Collector configuration + Config *ServerMessage_Config `protobuf:"bytes,1,opt,name=config,proto3,oneof"` +} + +type ServerMessage_Pause_ struct { + // Server request to pause collection of data (e.g. in case of duplicate collectors) + Pause *ServerMessage_Pause `protobuf:"bytes,2,opt,name=pause,proto3,oneof"` +} + +type ServerMessage_ExplainRun_ struct { + // Request for the collector to generate an explain plan for a query + ExplainRun *ServerMessage_ExplainRun `protobuf:"bytes,3,opt,name=explain_run,json=explainRun,proto3,oneof"` +} + +func (*ServerMessage_Config_) isServerMessage_Message() {} + +func (*ServerMessage_Pause_) isServerMessage_Message() {} + +func (*ServerMessage_ExplainRun_) isServerMessage_Message() {} + +type ServerMessage_Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ServerId string `protobuf:"bytes,1,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"` + ServerUrl string `protobuf:"bytes,2,opt,name=server_url,json=serverUrl,proto3" json:"server_url,omitempty"` + SentryDsn string `protobuf:"bytes,3,opt,name=sentry_dsn,json=sentryDsn,proto3" json:"sentry_dsn,omitempty"` + Features *ServerMessage_Features `protobuf:"bytes,4,opt,name=features,proto3" json:"features,omitempty"` + EnableActivity bool `protobuf:"varint,5,opt,name=enable_activity,json=enableActivity,proto3" json:"enable_activity,omitempty"` + EnableLogs bool `protobuf:"varint,6,opt,name=enable_logs,json=enableLogs,proto3" json:"enable_logs,omitempty"` + // Maximum number of tables that can be monitored per server + SchemaTableLimit int32 `protobuf:"varint,7,opt,name=schema_table_limit,json=schemaTableLimit,proto3" json:"schema_table_limit,omitempty"` +} + +func (x *ServerMessage_Config) Reset() { + *x = ServerMessage_Config{} + if protoimpl.UnsafeEnabled { + mi := &file_server_message_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage_Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage_Config) ProtoMessage() {} + +func (x *ServerMessage_Config) ProtoReflect() protoreflect.Message { + mi := &file_server_message_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage_Config.ProtoReflect.Descriptor instead. +func (*ServerMessage_Config) Descriptor() ([]byte, []int) { + return file_server_message_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *ServerMessage_Config) GetServerId() string { + if x != nil { + return x.ServerId + } + return "" +} + +func (x *ServerMessage_Config) GetServerUrl() string { + if x != nil { + return x.ServerUrl + } + return "" +} + +func (x *ServerMessage_Config) GetSentryDsn() string { + if x != nil { + return x.SentryDsn + } + return "" +} + +func (x *ServerMessage_Config) GetFeatures() *ServerMessage_Features { + if x != nil { + return x.Features + } + return nil +} + +func (x *ServerMessage_Config) GetEnableActivity() bool { + if x != nil { + return x.EnableActivity + } + return false +} + +func (x *ServerMessage_Config) GetEnableLogs() bool { + if x != nil { + return x.EnableLogs + } + return false +} + +func (x *ServerMessage_Config) GetSchemaTableLimit() int32 { + if x != nil { + return x.SchemaTableLimit + } + return 0 +} + +type ServerMessage_Features struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // How often the collector should run pg_stat_statements_reset() + StatementResetFrequency int32 `protobuf:"varint,1,opt,name=statement_reset_frequency,json=statementResetFrequency,proto3" json:"statement_reset_frequency,omitempty"` + // Statement timeout for all SQL statements sent to the database (defaults to 30s) + StatementTimeoutMs int32 `protobuf:"varint,2,opt,name=statement_timeout_ms,json=statementTimeoutMs,proto3" json:"statement_timeout_ms,omitempty"` + // Statement timeout for pg_stat_statements query text requests (defaults to 120s) + StatementTimeoutMsQueryText int32 `protobuf:"varint,3,opt,name=statement_timeout_ms_query_text,json=statementTimeoutMsQueryText,proto3" json:"statement_timeout_ms_query_text,omitempty"` +} + +func (x *ServerMessage_Features) Reset() { + *x = ServerMessage_Features{} + if protoimpl.UnsafeEnabled { + mi := &file_server_message_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage_Features) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage_Features) ProtoMessage() {} + +func (x *ServerMessage_Features) ProtoReflect() protoreflect.Message { + mi := &file_server_message_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage_Features.ProtoReflect.Descriptor instead. +func (*ServerMessage_Features) Descriptor() ([]byte, []int) { + return file_server_message_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *ServerMessage_Features) GetStatementResetFrequency() int32 { + if x != nil { + return x.StatementResetFrequency + } + return 0 +} + +func (x *ServerMessage_Features) GetStatementTimeoutMs() int32 { + if x != nil { + return x.StatementTimeoutMs + } + return 0 +} + +func (x *ServerMessage_Features) GetStatementTimeoutMsQueryText() int32 { + if x != nil { + return x.StatementTimeoutMsQueryText + } + return 0 +} + +type ServerMessage_Pause struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pause bool `protobuf:"varint,1,opt,name=pause,proto3" json:"pause,omitempty"` + Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"` +} + +func (x *ServerMessage_Pause) Reset() { + *x = ServerMessage_Pause{} + if protoimpl.UnsafeEnabled { + mi := &file_server_message_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage_Pause) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage_Pause) ProtoMessage() {} + +func (x *ServerMessage_Pause) ProtoReflect() protoreflect.Message { + mi := &file_server_message_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage_Pause.ProtoReflect.Descriptor instead. +func (*ServerMessage_Pause) Descriptor() ([]byte, []int) { + return file_server_message_proto_rawDescGZIP(), []int{0, 2} +} + +func (x *ServerMessage_Pause) GetPause() bool { + if x != nil { + return x.Pause + } + return false +} + +func (x *ServerMessage_Pause) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +type ServerMessage_ExplainRun struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ExplainQueryId int64 `protobuf:"varint,1,opt,name=explain_query_id,json=explainQueryId,proto3" json:"explain_query_id,omitempty"` + ExplainParameterSetId int64 `protobuf:"varint,2,opt,name=explain_parameter_set_id,json=explainParameterSetId,proto3" json:"explain_parameter_set_id,omitempty"` + DatabaseName string `protobuf:"bytes,3,opt,name=database_name,json=databaseName,proto3" json:"database_name,omitempty"` + QueryText string `protobuf:"bytes,4,opt,name=query_text,json=queryText,proto3" json:"query_text,omitempty"` +} + +func (x *ServerMessage_ExplainRun) Reset() { + *x = ServerMessage_ExplainRun{} + if protoimpl.UnsafeEnabled { + mi := &file_server_message_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerMessage_ExplainRun) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage_ExplainRun) ProtoMessage() {} + +func (x *ServerMessage_ExplainRun) ProtoReflect() protoreflect.Message { + mi := &file_server_message_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage_ExplainRun.ProtoReflect.Descriptor instead. +func (*ServerMessage_ExplainRun) Descriptor() ([]byte, []int) { + return file_server_message_proto_rawDescGZIP(), []int{0, 3} +} + +func (x *ServerMessage_ExplainRun) GetExplainQueryId() int64 { + if x != nil { + return x.ExplainQueryId + } + return 0 +} + +func (x *ServerMessage_ExplainRun) GetExplainParameterSetId() int64 { + if x != nil { + return x.ExplainParameterSetId + } + return 0 +} + +func (x *ServerMessage_ExplainRun) GetDatabaseName() string { + if x != nil { + return x.DatabaseName + } + return "" +} + +func (x *ServerMessage_ExplainRun) GetQueryText() string { + if x != nil { + return x.QueryText + } + return "" +} + +var File_server_message_proto protoreflect.FileDescriptor + +var file_server_message_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, + 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0xc8, 0x07, 0x0a, 0x0d, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x43, 0x0a, + 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, + 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x40, 0x0a, 0x05, 0x70, 0x61, 0x75, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x28, 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, + 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x50, 0x61, 0x75, 0x73, 0x65, 0x48, 0x00, 0x52, 0x05, 0x70, + 0x61, 0x75, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x0b, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x5f, + 0x72, 0x75, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x70, 0x67, 0x61, 0x6e, + 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x45, 0x78, + 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x52, 0x75, 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x6c, + 0x61, 0x69, 0x6e, 0x52, 0x75, 0x6e, 0x1a, 0xa4, 0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1d, + 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x1d, 0x0a, + 0x0a, 0x73, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x73, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x73, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x44, 0x73, 0x6e, 0x12, 0x47, 0x0a, 0x08, + 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, + 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, + 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x2e, 0x46, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x52, 0x08, 0x66, 0x65, 0x61, + 0x74, 0x75, 0x72, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, + 0x61, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, + 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x12, 0x1f, + 0x0a, 0x0b, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x0a, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x6f, 0x67, 0x73, 0x12, + 0x2c, 0x0a, 0x12, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, + 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x1a, 0xbe, 0x01, + 0x0a, 0x08, 0x46, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x12, 0x3a, 0x0a, 0x19, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x73, 0x65, 0x74, 0x5f, 0x66, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x17, 0x73, + 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x65, 0x74, 0x46, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x30, 0x0a, 0x14, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, + 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x6d, 0x73, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x12, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x54, + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x73, 0x12, 0x44, 0x0a, 0x1f, 0x73, 0x74, 0x61, 0x74, + 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x6d, 0x73, + 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x74, 0x65, 0x78, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x1b, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x4d, 0x73, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x65, 0x78, 0x74, 0x1a, 0x35, + 0x0a, 0x05, 0x50, 0x61, 0x75, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x75, 0x73, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x70, 0x61, 0x75, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, + 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x1a, 0xb3, 0x01, 0x0a, 0x0a, 0x45, 0x78, 0x70, 0x6c, 0x61, 0x69, + 0x6e, 0x52, 0x75, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x5f, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, + 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x49, 0x64, 0x12, 0x37, + 0x0a, 0x18, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x5f, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, + 0x74, 0x65, 0x72, 0x5f, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x15, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, + 0x65, 0x72, 0x53, 0x65, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x61, 0x73, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1d, 0x0a, 0x0a, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x74, 0x65, 0x78, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x71, 0x75, 0x65, 0x72, 0x79, 0x54, 0x65, 0x78, 0x74, 0x42, 0x09, 0x0a, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2f, 0x63, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x2f, + 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_server_message_proto_rawDescOnce sync.Once + file_server_message_proto_rawDescData = file_server_message_proto_rawDesc +) + +func file_server_message_proto_rawDescGZIP() []byte { + file_server_message_proto_rawDescOnce.Do(func() { + file_server_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_server_message_proto_rawDescData) + }) + return file_server_message_proto_rawDescData +} + +var file_server_message_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_server_message_proto_goTypes = []interface{}{ + (*ServerMessage)(nil), // 0: pganalyze.collector.ServerMessage + (*ServerMessage_Config)(nil), // 1: pganalyze.collector.ServerMessage.Config + (*ServerMessage_Features)(nil), // 2: pganalyze.collector.ServerMessage.Features + (*ServerMessage_Pause)(nil), // 3: pganalyze.collector.ServerMessage.Pause + (*ServerMessage_ExplainRun)(nil), // 4: pganalyze.collector.ServerMessage.ExplainRun +} +var file_server_message_proto_depIdxs = []int32{ + 1, // 0: pganalyze.collector.ServerMessage.config:type_name -> pganalyze.collector.ServerMessage.Config + 3, // 1: pganalyze.collector.ServerMessage.pause:type_name -> pganalyze.collector.ServerMessage.Pause + 4, // 2: pganalyze.collector.ServerMessage.explain_run:type_name -> pganalyze.collector.ServerMessage.ExplainRun + 2, // 3: pganalyze.collector.ServerMessage.Config.features:type_name -> pganalyze.collector.ServerMessage.Features + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_server_message_proto_init() } +func file_server_message_proto_init() { + if File_server_message_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_server_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage_Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage_Features); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_message_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage_Pause); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_message_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerMessage_ExplainRun); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_server_message_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*ServerMessage_Config_)(nil), + (*ServerMessage_Pause_)(nil), + (*ServerMessage_ExplainRun_)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_server_message_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_server_message_proto_goTypes, + DependencyIndexes: file_server_message_proto_depIdxs, + MessageInfos: file_server_message_proto_msgTypes, + }.Build() + File_server_message_proto = out.File + file_server_message_proto_rawDesc = nil + file_server_message_proto_goTypes = nil + file_server_message_proto_depIdxs = nil +} diff --git a/protobuf/server_message.proto b/protobuf/server_message.proto new file mode 100644 index 000000000..08488e45f --- /dev/null +++ b/protobuf/server_message.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +package pganalyze.collector; +option go_package = "github.com/pganalyze/collector/output/pganalyze_collector"; + +message ServerMessage { + oneof message { + // Collector configuration + Config config = 1; + // Server request to pause collection of data (e.g. in case of duplicate collectors) + Pause pause = 2; + // Request for the collector to generate an explain plan for a query + ExplainRun explain_run = 3; + } + + message Config { + string server_id = 1; + string server_url = 2; + string sentry_dsn = 3; + Features features = 4; + bool enable_activity = 5; + bool enable_logs = 6; + // Maximum number of tables that can be monitored per server + int32 schema_table_limit = 7; + } + + message Features { + // How often the collector should run pg_stat_statements_reset() + int32 statement_reset_frequency = 1; + // Statement timeout for all SQL statements sent to the database (defaults to 30s) + int32 statement_timeout_ms = 2; + // Statement timeout for pg_stat_statements query text requests (defaults to 120s) + int32 statement_timeout_ms_query_text = 3; + } + + message Pause { + bool pause = 1; + string reason = 2; + } + + message ExplainRun { + int64 explain_query_id = 1; + int64 explain_parameter_set_id = 2; + string database_name = 3; + string query_text = 4; + } +} diff --git a/runner/activity.go b/runner/activity.go index c8d5ed3ca..39c5cf3c6 100644 --- a/runner/activity.go +++ b/runner/activity.go @@ -25,6 +25,11 @@ func processActivityForServer(ctx context.Context, server *state.Server, globalC newState := server.ActivityPrevState + if server.Pause.Pause == true { + logger.PrintWarning("Snapshot processing disabled by pganalyze server: %s", server.Pause.Reason) + return newState, false, nil + } + if server.Config.SkipIfReplica { connection, err = postgres.EstablishConnection(ctx, server, logger, globalCollectionOpts, "") if err != nil { @@ -41,7 +46,9 @@ func processActivityForServer(ctx context.Context, server *state.Server, globalC } } - if !globalCollectionOpts.ForceEmptyGrant { + if server.WebSocket != nil { + newGrant = server.Grant + } else if !globalCollectionOpts.ForceEmptyGrant { newGrant, err = grant.GetDefaultGrant(ctx, server, globalCollectionOpts, logger) if err != nil { return newState, false, errors.Wrap(err, "could not get default grant for activity snapshot") diff --git a/runner/full.go b/runner/full.go index 1b0cb489f..67923a977 100644 --- a/runner/full.go +++ b/runner/full.go @@ -89,13 +89,19 @@ func processServer(ctx context.Context, server *state.Server, globalCollectionOp var collectionStatus state.CollectionStatus var err error + if server.Pause.Pause == true { + logger.PrintWarning("Snapshot processing disabled by pganalyze server: %s", server.Pause.Reason) + return newState, newGrant, collectionStatus, nil + } + err = checkReplicaCollectionDisabled(ctx, server, globalCollectionOpts, logger) if err != nil { return state.PersistedState{}, state.Grant{}, state.CollectionStatus{}, err } - if !globalCollectionOpts.ForceEmptyGrant { - // Note: In case of server errors, we should reuse the old grant if its still recent (i.e. less than 50 minutes ago) + if server.WebSocket != nil { + newGrant = server.Grant + } else if !globalCollectionOpts.ForceEmptyGrant { newGrant, err = grant.GetDefaultGrant(ctx, server, globalCollectionOpts, logger) if err != nil { if server.Grant.Valid { @@ -110,7 +116,7 @@ func processServer(ctx context.Context, server *state.Server, globalCollectionOp var sentryClient *raven.Client if server.Grant.Config.SentryDsn != "" { - sentryClient, err = raven.NewWithTags(server.Grant.Config.SentryDsn, map[string]string{"server_id": server.Grant.Config.ServerID}) + sentryClient, err = raven.NewWithTags(server.Grant.Config.SentryDsn, map[string]string{"server_id": server.Grant.Config.ServerId}) if err != nil { logger.PrintVerbose("Failed to setup Sentry client: %s", err) } else { diff --git a/runner/logs.go b/runner/logs.go index 8311f3d0b..dc8389a17 100644 --- a/runner/logs.go +++ b/runner/logs.go @@ -39,7 +39,7 @@ func SetupLogCollection(ctx context.Context, wg *sync.WaitGroup, servers []*stat var hasAnyLogTails bool for _, server := range servers { - if server.Config.DisableLogs { + if server.Config.DisableLogs || server.Pause.Pause == true { continue } if server.Config.LogLocation != "" || server.Config.LogDockerTail != "" || server.Config.LogSyslogServer != "" || server.Config.LogOtelServer != "" { diff --git a/runner/websocket.go b/runner/websocket.go new file mode 100644 index 000000000..662fa095d --- /dev/null +++ b/runner/websocket.go @@ -0,0 +1,103 @@ +package runner + +import ( + "bytes" + "compress/zlib" + "context" + "io" + "net/url" + "time" + + "github.com/gorilla/websocket" + "github.com/pganalyze/collector/output/pganalyze_collector" + "github.com/pganalyze/collector/state" + "github.com/pganalyze/collector/util" + "google.golang.org/protobuf/proto" +) + +func SetupWebsocketForAllServers(ctx context.Context, servers []*state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) { + for idx := range servers { + go func(server *state.Server) { + logger = logger.WithPrefixAndRememberErrors(server.Config.SectionName) + for { + if server.WebSocket == nil { + connect(ctx, server, globalCollectionOpts, logger) + } + time.Sleep(60 * time.Second) // Delay between reconnect attempts + } + }(servers[idx]) + } +} + +func connect(ctx context.Context, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) { + connCtx, cancelConn := context.WithCancel(ctx) + url, _ := url.Parse(server.Config.APIBaseURL + "/v2/websocket") + if url.Scheme == "http" { + url.Scheme = "ws" + } else { + url.Scheme = "wss" + } + headers := make(map[string][]string) + headers["Pganalyze-Api-Key"] = []string{server.Config.APIKey} + headers["Pganalyze-System-Id"] = []string{server.Config.SystemID} + headers["Pganalyze-System-Type"] = []string{server.Config.SystemType} + headers["Pganalyze-System-Scope"] = []string{server.Config.SystemScope} + headers["Pganalyze-System-Id-Fallback"] = []string{server.Config.SystemIDFallback} + headers["Pganalyze-System-Type-Fallback"] = []string{server.Config.SystemTypeFallback} + headers["Pganalyze-System-Scope-Fallback"] = []string{server.Config.SystemScopeFallback} + headers["User-Agent"] = []string{util.CollectorNameAndVersion} + headers["Sec-WebSocket-Protocol"] = []string{"websocket"} + conn, response, err := websocket.DefaultDialer.DialContext(connCtx, url.String(), headers) + if err != nil { + cancelConn() + time.Sleep(10 * time.Second) + logger.PrintError("Error starting websocket: %s %v", err, response) + return + } + server.WebSocket = conn + go func() { + for { + select { + case <-connCtx.Done(): + err := server.WebSocket.Close() + if err != nil { + logger.PrintError("Error closing websocket: %s", err) + return + } + server.WebSocket = nil + return + case snapshot := <-server.SnapshotStream: + logger.PrintError("Sending snapshot") + server.WebSocket.WriteMessage(websocket.BinaryMessage, snapshot) + } + } + }() + go func() { + for { + _, compressedData, err := conn.ReadMessage() + if err != nil { + server.WebSocket = nil + return + } + var data bytes.Buffer + r, err := zlib.NewReader(bytes.NewReader(compressedData)) + if err != nil { + logger.PrintError("Error decompressing ServerMessage: %s", err) + return + } + defer r.Close() + io.Copy(&data, r) + message := &pganalyze_collector.ServerMessage{} + err = proto.Unmarshal(data.Bytes(), message) + if err != nil { + logger.PrintError("Error parsing ServerMessage: %s", err) + } else if message.GetConfig() != nil { + server.Grant.Config = *message.GetConfig() + } else if message.GetPause() != nil { + server.Pause = *message.GetPause() + } else if message.GetExplainRun() != nil { + logger.PrintError("ExplainRun: %v", message.GetExplainRun()) // TODO + } + } + }() +} diff --git a/state/logs.go b/state/logs.go index 846d47646..bf9bb7f80 100644 --- a/state/logs.go +++ b/state/logs.go @@ -15,10 +15,10 @@ import ( type GrantLogs struct { Valid bool - Config GrantConfig `json:"config"` - Logdata GrantS3 `json:"logdata"` - Snapshot GrantS3 `json:"snapshot"` - EncryptionKey GrantLogsEncryptionKey `json:"encryption_key"` + Config pganalyze_collector.ServerMessage_Config `json:"config"` + Logdata GrantS3 `json:"logdata"` + Snapshot GrantS3 `json:"snapshot"` + EncryptionKey GrantLogsEncryptionKey `json:"encryption_key"` } type GrantLogsEncryptionKey struct { diff --git a/state/state.go b/state/state.go index 1498bb265..cfa8146b9 100644 --- a/state/state.go +++ b/state/state.go @@ -7,7 +7,9 @@ import ( "time" raven "github.com/getsentry/raven-go" + "github.com/gorilla/websocket" "github.com/pganalyze/collector/config" + "github.com/pganalyze/collector/output/pganalyze_collector" ) type SchemaStats struct { @@ -213,33 +215,12 @@ type CollectionOpts struct { OutputAsJson bool } -type GrantConfig struct { - ServerID string `json:"server_id"` - ServerURL string `json:"server_url"` - SentryDsn string `json:"sentry_dsn"` - - Features GrantFeatures `json:"features"` - - EnableActivity bool `json:"enable_activity"` - EnableLogs bool `json:"enable_logs"` - - SchemaTableLimit int `json:"schema_table_limit"` // Maximum number of tables that can be monitored per server -} - -type GrantFeatures struct { - Logs bool `json:"logs"` - - StatementResetFrequency int `json:"statement_reset_frequency"` - StatementTimeoutMs int32 `json:"statement_timeout_ms"` // Statement timeout for all SQL statements sent to the database (defaults to 30s) - StatementTimeoutMsQueryText int32 `json:"statement_timeout_ms_query_text"` // Statement timeout for pg_stat_statements query text requests (defaults to 120s) -} - type Grant struct { Valid bool - Config GrantConfig `json:"config"` - S3URL string `json:"s3_url"` - S3Fields map[string]string `json:"s3_fields"` - LocalDir string `json:"local_dir"` + Config pganalyze_collector.ServerMessage_Config `json:"config"` + S3URL string `json:"s3_url"` + S3Fields map[string]string `json:"s3_fields"` + LocalDir string `json:"local_dir"` } func (g Grant) S3() GrantS3 { @@ -278,6 +259,10 @@ type Server struct { SelfTest *SelfTestResult + WebSocket *websocket.Conn + SnapshotStream chan []byte + Pause pganalyze_collector.ServerMessage_Pause + // The time zone that logs are parsed in, synced from the setting log_timezone // The StateMutex should be held while updating this LogTimezone *time.Location @@ -303,7 +288,10 @@ func MakeServer(config config.ServerConfig, testRun bool) *Server { ActivityStateMutex: &sync.Mutex{}, CollectionStatusMutex: &sync.Mutex{}, LogTimezoneMutex: &sync.Mutex{}, + SnapshotStream: make(chan []byte), } + server.Pause = pganalyze_collector.ServerMessage_Pause{Pause: false} + server.Grant.Config.Features = &pganalyze_collector.ServerMessage_Features{} if testRun { server.SelfTest = MakeSelfTest() }