Skip to content

Commit

Permalink
[cmds][scd/store] Add 'db-evictor' command enabling cleanup of expire…
Browse files Browse the repository at this point in the history
…d op intents and subscriptions
  • Loading branch information
mickmis committed Sep 12, 2024
1 parent b519763 commit 1918497
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 2 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This Dockerfile builds the InterUSS `dss` image which contains the binary
# executables for the core-service and the db-manager. It also
# executables for the core-service, the db-manager and the db-evictor. It also
# contains a light weight tool that provides debugging capability. To run a
# container for this image, the desired binary must be specified (either
# /usr/bin/core-service or /usr/bin/db-manager).
# /usr/bin/core-service, /usr/bin/db-manager, or /usr/bin/db-evictor).

FROM golang:1.22-alpine AS build
RUN apk add build-base
Expand All @@ -27,6 +27,7 @@ FROM alpine:latest
RUN apk update && apk add ca-certificates
COPY --from=build /go/bin/core-service /usr/bin
COPY --from=build /go/bin/db-manager /usr/bin
COPY --from=build /go/bin/db-evictor /usr/bin
COPY --from=build /go/bin/dlv /usr/bin
COPY build/jwt-public-certs /jwt-public-certs
COPY build/test-certs /test-certs
Expand Down
106 changes: 106 additions & 0 deletions cmds/db-evictor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"github.com/interuss/dss/pkg/cockroach"
"github.com/interuss/dss/pkg/cockroach/flags"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/dss/pkg/scd/repos"
scdc "github.com/interuss/dss/pkg/scd/store/cockroach"
)

var (
listOpIntents = flag.Bool("op_intents", true, "set this flag to true to list expired operational intents")
listScdSubs = flag.Bool("scd_subs", true, "set this flag to true to list expired SCD subscriptions")
ttl = flag.Duration("ttl", time.Hour*24*112, "time-to-live duration used for determining expiration")
deleteExpired = flag.Bool("delete", false, "set this flag to true to delete the expired entities")
)

// TODO: add documentation where relevant
// TODO: add relevant wrappers for k8s usage

func main() {
flag.Parse()

var (
ctx = context.Background()
threshold = time.Now().Add(-*ttl)
)

connectParameters := flags.ConnectParameters()
connectParameters.ApplicationName = "db-evictor"
connectParameters.DBName = scdc.DatabaseName
scdCrdb, err := cockroach.Dial(ctx, connectParameters)
if err != nil {
log.Panicf("Failed to connect to database with %+v: %v", connectParameters, err)
}

scdStore, err := scdc.NewStore(ctx, scdCrdb)
if err != nil {
log.Panicf("Failed to create strategic conflict detection store with %+v: %v", connectParameters, err)
}

var (
expiredOpIntents []*scdmodels.OperationalIntent
expiredSubs []*scdmodels.Subscription
)
action := func(ctx context.Context, r repos.Repository) (err error) {
if *listOpIntents {
expiredOpIntents, err = r.ListExpiredOperationalIntents(ctx, threshold)
if err != nil {
return fmt.Errorf("listing expired operational intents: %w", err)
}
if *deleteExpired {
for _, opIntent := range expiredOpIntents {
err = r.DeleteOperationalIntent(ctx, opIntent.ID)
}
}
}

if *listScdSubs {
expiredSubs, err = r.ListExpiredSubscriptions(ctx, threshold)
if err != nil {
return fmt.Errorf("listing expired operational intents: %w", err)
}
if *deleteExpired {
for _, sub := range expiredSubs {
err = r.DeleteSubscription(ctx, sub.ID)
}
}
}

return nil
}
if err = scdStore.Transact(ctx, action); err != nil {
log.Panicf("Failed to execute CRDB transaction: %v", err)
}

for _, opIntent := range expiredOpIntents {
logExpiredEntity("operational intent", opIntent.ID, threshold, *deleteExpired, opIntent.EndTime != nil)
}
for _, sub := range expiredSubs {
logExpiredEntity("subscription", sub.ID, threshold, *deleteExpired, sub.EndTime != nil)
}
if len(expiredOpIntents) == 0 && len(expiredSubs) == 0 {
log.Printf("no entity older than %s found", threshold.String())
}
}

func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) {
logMsg := "found"
if deleted {
logMsg = "deleted"
}

expMsg := "last update before %s (missing end time)"
if hasEndTime {
expMsg = "end time before %s"
}
log.Printf("%s %s %s; expired due to %s", logMsg, entity, entityID.String(), fmt.Sprintf(expMsg, threshold.String()))
}
10 changes: 10 additions & 0 deletions pkg/scd/repos/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package repos

import (
"context"
"time"

"github.com/golang/geo/s2"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
Expand All @@ -27,6 +29,10 @@ type OperationalIntent interface {
// GetDependentOperationalIntents returns IDs of all operations dependent on
// subscription identified by "subscriptionID".
GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error)

// ListExpiredOperationalIntents lists all operational intents older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error)
}

// Subscription abstracts subscription-specific interactions with the backing repository.
Expand Down Expand Up @@ -54,6 +60,10 @@ type Subscription interface {

// LockSubscriptionsOnCells locks the subscriptions of interest on specific cells.
LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error

// ListExpiredSubscriptions lists all subscriptions older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error)
}

type UssAvailability interface {
Expand Down
26 changes: 26 additions & 0 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,29 @@ func (s *repo) GetDependentOperationalIntents(ctx context.Context, subscriptionI

return dependentOps, nil
}

// ListExpiredOperationalIntents lists all operational intents older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) {
expiredOpIntentsQuery := fmt.Sprintf(`
SELECT
%s
FROM
scd_operations
WHERE
scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1
OR
scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, operationFieldsWithPrefix)

result, err := s.fetchOperationalIntents(
ctx, s.q, expiredOpIntentsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Error fetching Operations")
}

return result, nil
}
27 changes: 27 additions & 0 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,30 @@ func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion)

return nil
}

// ListExpiredSubscriptions lists all subscriptions older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) {
expiredSubsQuery := fmt.Sprintf(`
SELECT
%s
FROM
scd_subscriptions
WHERE
scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1
OR
scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, subscriptionFieldsWithPrefix)

subscriptions, err := c.fetchSubscriptions(
ctx, c.q, expiredSubsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions")
}

return subscriptions, nil

}

0 comments on commit 1918497

Please sign in to comment.