Skip to content

Commit

Permalink
Add snapshot WebSocket API
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlinsley committed Aug 15, 2024
1 parent 5e8eab6 commit 6619535
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 39 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ vendor:
modvendor -copy="**/*.c **/*.h **/*.proto" -v

test: build
go test -coverprofile=coverage.out ./...
# go tool cover -html=coverage.out
go test -race -coverprofile=coverage.out ./...

coverage: test
go tool cover -html=coverage.out

run: build
go run -race .

integration_test:
make -C integration_test
Expand Down
2 changes: 1 addition & 1 deletion input/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
}

ps.StatementResetCounter = server.PrevState.StatementResetCounter + 1
if server.Grant.Config.Features.StatementResetFrequency != 0 && ps.StatementResetCounter >= server.Grant.Config.Features.StatementResetFrequency {
if server.Grant.Config.Features.StatementResetFrequency != 0 && ps.StatementResetCounter >= int(server.Grant.Config.Features.StatementResetFrequency) {
ps.StatementResetCounter = 0
err = postgres.ResetStatements(ctx, logger, connection, systemType)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func CollectAllSchemas(ctx context.Context, server *state.Server, collectionOpts
ts.DatabaseOidsWithLocalCatalog = append(ts.DatabaseOidsWithLocalCatalog, databaseOid)
server.SelfTest.MarkDbCollectionAspectOk(dbName, state.CollectionAspectSchema)
}
schemaTableLimit := server.Grant.Config.SchemaTableLimit
schemaTableLimit := int(server.Grant.Config.SchemaTableLimit)
if schemaTableLimit == 0 {
schemaTableLimit = defaultSchemaTableLimit
}
Expand Down
2 changes: 1 addition & 1 deletion logs/querysample/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg

return fmt.Sprintf(
"%s/databases/%s/queries/%s/samples/%d?role=%s",
grant.Config.ServerURL,
grant.Config.ServerUrl,
sample.Database,
hex.EncodeToString(fpBin),
sample.OccurredAt.Unix(),
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col
wg.Done()
}, logger, "high frequency query statistics of all servers", schedulerGroups["stats"])

runner.SetupWebsocketForAllServers(ctx, servers, globalCollectionOpts, logger)

keepRunning = true
return
}
Expand Down
5 changes: 5 additions & 0 deletions output/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func uploadAndSubmitCompactSnapshot(ctx context.Context, s pganalyze_collector.C
return nil
}

if server.WebSocket != nil {
server.SnapshotStream <- compressedData.Bytes()
return nil
}

s3Location, err := uploadSnapshot(ctx, server.Config.HTTPClientWithRetry, grant, logger, compressedData, snapshotUUID.String())
if err != nil {
logger.PrintError("Error uploading snapshot: %s", err)
Expand Down
5 changes: 5 additions & 0 deletions output/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func submitFull(ctx context.Context, s snapshot.FullSnapshot, server *state.Serv
return nil
}

if server.WebSocket != nil {
server.SnapshotStream <- compressedData.Bytes()
return nil
}

s3Location, err := uploadSnapshot(ctx, server.Config.HTTPClientWithRetry, server.Grant, logger, compressedData, snapshotUUID.String())
if err != nil {
logger.PrintError("Error uploading snapshot: %s", err)
Expand Down
Loading

0 comments on commit 6619535

Please sign in to comment.