Skip to content

Commit

Permalink
[cmds][scd/store] Add 'db-evictor' command enabling cleanup of expire…
Browse files Browse the repository at this point in the history
…d op intents and subscriptions
  • Loading branch information
mickmis committed Sep 13, 2024
1 parent b519763 commit 6390f18
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 2 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This Dockerfile builds the InterUSS `dss` image which contains the binary
# executables for the core-service and the db-manager. It also
# executables for the core-service, the db-manager and the db-evictor. It also
# contains a light weight tool that provides debugging capability. To run a
# container for this image, the desired binary must be specified (either
# /usr/bin/core-service or /usr/bin/db-manager).
# /usr/bin/core-service, /usr/bin/db-manager, or /usr/bin/db-evictor).

FROM golang:1.22-alpine AS build
RUN apk add build-base
Expand All @@ -27,6 +27,7 @@ FROM alpine:latest
RUN apk update && apk add ca-certificates
COPY --from=build /go/bin/core-service /usr/bin
COPY --from=build /go/bin/db-manager /usr/bin
COPY --from=build /go/bin/db-evictor /usr/bin
COPY --from=build /go/bin/dlv /usr/bin
COPY build/jwt-public-certs /jwt-public-certs
COPY build/test-certs /test-certs
Expand Down
66 changes: 66 additions & 0 deletions cmds/db-evictor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# db-evictor

CLI tool that lists and deletes expired entities in the DSS store.
At the time of writing this README, the entities supported by this tool are:
- SCD operational intents;
- SCD subscriptions.

## Usage
Extract from running `db-evictor --help`:
```
Usage of db-evictor:
-cockroach_application_name string
application name for tagging the connection to cockroach (default "dss")
-cockroach_db_name string
application name for tagging the connection to cockroach (default "dss")
-cockroach_host string
cockroach host to connect to
-cockroach_max_retries int
maximum number of attempts to retry a query in case of contention, default is 100 (default 100)
-cockroach_port int
cockroach port to connect to (default 26257)
-cockroach_ssl_dir string
directory to ssl certificates. Must contain files: ca.crt, client.<user>.crt, client.<user>.key
-cockroach_ssl_mode string
cockroach sslmode (default "disable")
-cockroach_user string
cockroach user to authenticate as (default "root")
-delete
set this flag to true to delete the expired entities
-max_conn_idle_secs int
maximum amount of time in seconds a connection may be idle, default is 30 seconds (default 30)
-max_open_conns int
maximum number of open connections to the database, default is 4 (default 4)
-op_intents
set this flag to true to list expired operational intents (default true)
-scd_subs
set this flag to true to list expired SCD subscriptions (default true)
-ttl duration
time-to-live duration used for determining expiration (default 2688h0m0s)
```

Do note:
- by default expired entities are only listed, not deleted, the flag `-delete` is required for deleting entities;
- the flag `-ttl` accepts durations formatted as [Go `time.Duration` strings](https://pkg.go.dev/time#ParseDuration), e.g. `24h`;
- the CockroachDB cluster connection flags are the same than [the `core-service` command](../core-service/README.md).

## Examples
The following examples assume a running DSS deployed locally through [the `run_locally.sh` script](../../build/dev/standalone_instance.md).

### List all entities older than 1 week
```shell
docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \
-cockroach_host=local-dss-crdb -ttl=168h
```

### List operational intents older than 1 week
```shell
docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \
-cockroach_host=local-dss-crdb -ttl=168h -op_intents=true -scd_subs=false
```

### Delete all entities older than 30 days
```shell
docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \
-cockroach_host=local-dss-crdb -ttl=720h -delete
```
107 changes: 107 additions & 0 deletions cmds/db-evictor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"github.com/interuss/dss/pkg/cockroach"
"github.com/interuss/dss/pkg/cockroach/flags"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/dss/pkg/scd/repos"
scdc "github.com/interuss/dss/pkg/scd/store/cockroach"
)

var (
listOpIntents = flag.Bool("op_intents", true, "set this flag to true to list expired operational intents")
listScdSubs = flag.Bool("scd_subs", true, "set this flag to true to list expired SCD subscriptions")
ttl = flag.Duration("ttl", time.Hour*24*112, "time-to-live duration used for determining expiration")
deleteExpired = flag.Bool("delete", false, "set this flag to true to delete the expired entities")
)

func main() {
flag.Parse()

var (
ctx = context.Background()
threshold = time.Now().Add(-*ttl)
)

connectParameters := flags.ConnectParameters()
connectParameters.ApplicationName = "db-evictor"
connectParameters.DBName = scdc.DatabaseName
scdCrdb, err := cockroach.Dial(ctx, connectParameters)
if err != nil {
log.Panicf("Failed to connect to database with %+v: %v", connectParameters, err)
}

scdStore, err := scdc.NewStore(ctx, scdCrdb)
if err != nil {
log.Panicf("Failed to create strategic conflict detection store with %+v: %v", connectParameters, err)
}

var (
expiredOpIntents []*scdmodels.OperationalIntent
expiredSubs []*scdmodels.Subscription
)
action := func(ctx context.Context, r repos.Repository) (err error) {
if *listOpIntents {
expiredOpIntents, err = r.ListExpiredOperationalIntents(ctx, threshold)
if err != nil {
return fmt.Errorf("listing expired operational intents: %w", err)
}
if *deleteExpired {
for _, opIntent := range expiredOpIntents {
if err = r.DeleteOperationalIntent(ctx, opIntent.ID); err != nil {
return fmt.Errorf("deleting expired operational intents: %w", err)
}
}
}
}

if *listScdSubs {
expiredSubs, err = r.ListExpiredSubscriptions(ctx, threshold)
if err != nil {
return fmt.Errorf("listing expired subscriptions: %w", err)
}
if *deleteExpired {
for _, sub := range expiredSubs {
if err = r.DeleteSubscription(ctx, sub.ID); err != nil {
return fmt.Errorf("deleting expired subscriptions: %w", err)
}
}
}
}

return nil
}
if err = scdStore.Transact(ctx, action); err != nil {
log.Panicf("Failed to execute CRDB transaction: %v", err)
}

for _, opIntent := range expiredOpIntents {
logExpiredEntity("operational intent", opIntent.ID, threshold, *deleteExpired, opIntent.EndTime != nil)
}
for _, sub := range expiredSubs {
logExpiredEntity("subscription", sub.ID, threshold, *deleteExpired, sub.EndTime != nil)
}
if len(expiredOpIntents) == 0 && len(expiredSubs) == 0 {
log.Printf("no entity older than %s found", threshold.String())
}
}

func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) {
logMsg := "found"
if deleted {
logMsg = "deleted"
}

expMsg := "last update before %s (missing end time)"
if hasEndTime {
expMsg = "end time before %s"
}
log.Printf("%s %s %s; expired due to %s", logMsg, entity, entityID.String(), fmt.Sprintf(expMsg, threshold.String()))
}
10 changes: 10 additions & 0 deletions pkg/scd/repos/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package repos

import (
"context"
"time"

"github.com/golang/geo/s2"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
Expand All @@ -27,6 +29,10 @@ type OperationalIntent interface {
// GetDependentOperationalIntents returns IDs of all operations dependent on
// subscription identified by "subscriptionID".
GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error)

// ListExpiredOperationalIntents lists all operational intents older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error)
}

// Subscription abstracts subscription-specific interactions with the backing repository.
Expand Down Expand Up @@ -54,6 +60,10 @@ type Subscription interface {

// LockSubscriptionsOnCells locks the subscriptions of interest on specific cells.
LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error

// ListExpiredSubscriptions lists all subscriptions older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error)
}

type UssAvailability interface {
Expand Down
26 changes: 26 additions & 0 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,29 @@ func (s *repo) GetDependentOperationalIntents(ctx context.Context, subscriptionI

return dependentOps, nil
}

// ListExpiredOperationalIntents lists all operational intents older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) {
expiredOpIntentsQuery := fmt.Sprintf(`
SELECT
%s
FROM
scd_operations
WHERE
scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1
OR
scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, operationFieldsWithPrefix)

result, err := s.fetchOperationalIntents(
ctx, s.q, expiredOpIntentsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Error fetching Operations")
}

return result, nil
}
27 changes: 27 additions & 0 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,30 @@ func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion)

return nil
}

// ListExpiredSubscriptions lists all subscriptions older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) {
expiredSubsQuery := fmt.Sprintf(`
SELECT
%s
FROM
scd_subscriptions
WHERE
scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1
OR
scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, subscriptionFieldsWithPrefix)

subscriptions, err := c.fetchSubscriptions(
ctx, c.q, expiredSubsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions")
}

return subscriptions, nil

}

0 comments on commit 6390f18

Please sign in to comment.