Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte] SQL Migrations #1138

Merged
merged 10 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,31 @@ jobs:
run: make qualify-locally
- name: Bring down local DSS instance
run: make down-locally

dss-tests-with-yugabyte:
name: DSS tests with Yugabyte
runs-on: ubuntu-latest
env:
COMPOSE_PROFILES: with-yugabyte
steps:
- name: Job information
run: |
echo "Job information"
echo "Trigger: ${{ github.event_name }}"
echo "Host: ${{ runner.os }}"
echo "Repository: ${{ github.repository }}"
echo "Branch: ${{ github.ref }}"
docker images
go env
- name: Checkout
uses: actions/checkout@v2
with:
submodules: true
- name: Build dss image
run: make build-dss
- name: Tear down any pre-existing local DSS instance
run: make down-locally
- name: Start local DSS instance
run: make start-locally
- name: Bring down local DSS instance
run: make down-locally
3 changes: 3 additions & 0 deletions build/db_schemas/yugabyte/rid/downfrom-v1.0.0-crdb_v4.0.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE IF EXISTS identification_service_areas;
DROP TABLE IF EXISTS subscriptions;
DROP TABLE IF EXISTS schema_versions;
47 changes: 47 additions & 0 deletions build/db_schemas/yugabyte/rid/upto-v1.0.0-crdb_v4.0.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- This migration is equivalent to rid v4.0.0 schema for CockroachDB with the notable exception
-- of the inverted index replaced by ybgin, which presents currently some limitations
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.

CREATE TABLE subscriptions (
id UUID PRIMARY KEY,
owner TEXT NOT NULL,
url TEXT NOT NULL,
notification_index INT4 DEFAULT 0,
starts_at TIMESTAMPTZ,
ends_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
cells BIGINT[] NOT NULL,
writer TEXT,
CHECK (starts_at IS NULL OR ends_at IS NULL OR starts_at < ends_at),
CHECK (array_length(cells, 1) IS NOT NULL)
);
CREATE INDEX s_owner_idx ON subscriptions (owner);
CREATE INDEX s_starts_at_idx ON subscriptions (starts_at);
CREATE INDEX s_ends_at_idx ON subscriptions (ends_at);
CREATE INDEX s_cell_idx ON subscriptions USING ybgin (cells);
CREATE INDEX subs_by_time_with_owner ON subscriptions (ends_at) INCLUDE (owner);

CREATE TABLE identification_service_areas (
id UUID PRIMARY KEY,
owner TEXT NOT NULL,
url TEXT NOT NULL,
starts_at TIMESTAMPTZ,
ends_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
cells BIGINT[] NOT NULL,
writer TEXT,
CHECK (starts_at IS NULL OR ends_at IS NULL OR starts_at < ends_at),
CHECK (array_length(cells, 1) IS NOT NULL)
);
CREATE INDEX isa_owner_idx ON identification_service_areas (owner);
CREATE INDEX isa_starts_at_idx ON identification_service_areas (starts_at);
CREATE INDEX isa_ends_at_idx ON identification_service_areas (ends_at);
CREATE INDEX isa_updated_at_idx ON identification_service_areas (updated_at);
CREATE INDEX isa_cell_idx ON identification_service_areas USING ybgin (cells);

CREATE TABLE schema_versions (
onerow_enforcer bool PRIMARY KEY DEFAULT TRUE CHECK(onerow_enforcer),
schema_version TEXT NOT NULL
);

INSERT INTO schema_versions (schema_version) VALUES ('v1.0.0');
6 changes: 6 additions & 0 deletions build/db_schemas/yugabyte/scd/downfrom-v1.0.0-crdb_v3.2.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TABLE IF EXISTS schema_versions;
DROP TABLE IF EXISTS scd_uss_availability;
DROP TABLE IF EXISTS scd_constraints;
DROP TABLE IF EXISTS scd_operations;
DROP TABLE IF EXISTS scd_subscriptions;
DROP TYPE IF EXISTS operational_intent_state;
90 changes: 90 additions & 0 deletions build/db_schemas/yugabyte/scd/upto-v1.0.0-crdb_v3.2.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
-- This migration is equivalent to scd v3.2.0 schema for CockroachDB with the notable exception
-- of the inverted index replaced by ybgin, which presents currently some limitations
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.

CREATE TABLE IF NOT EXISTS scd_subscriptions (
id UUID PRIMARY KEY,
owner TEXT NOT NULL,
version INT4 NOT NULL DEFAULT 0,
url TEXT NOT NULL,
notification_index INT4 DEFAULT 0,
notify_for_operations BOOL DEFAULT false,
notify_for_constraints BOOL DEFAULT false,
implicit BOOL DEFAULT false,
starts_at TIMESTAMPTZ,
ends_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
cells BIGINT[],
CHECK (starts_at IS NULL OR ends_at IS NULL OR starts_at < ends_at),
CHECK (notify_for_operations OR notify_for_constraints)
);
CREATE INDEX ss_owner_idx ON scd_subscriptions (owner);
CREATE INDEX ss_starts_at_idx ON scd_subscriptions (starts_at);
CREATE INDEX ss_ends_at_idx ON scd_subscriptions (ends_at);
CREATE INDEX ss_cells_idx ON scd_subscriptions USING ybgin (cells);
barroco marked this conversation as resolved.
Show resolved Hide resolved

CREATE TYPE operational_intent_state AS ENUM ('Unknown', 'Accepted', 'Activated', 'Nonconforming', 'Contingent');

CREATE TABLE IF NOT EXISTS scd_operations (
id UUID PRIMARY KEY,
owner TEXT NOT NULL,
version INT4 NOT NULL DEFAULT 0,
url TEXT NOT NULL,
altitude_lower REAL,
altitude_upper REAL,
starts_at TIMESTAMPTZ,
ends_at TIMESTAMPTZ,
subscription_id UUID REFERENCES scd_subscriptions(id) ON DELETE CASCADE,
updated_at TIMESTAMPTZ NOT NULL,
state operational_intent_state NOT NULL DEFAULT 'Unknown',
cells BIGINT[],
uss_requested_ovn TEXT,
past_ovns TEXT[] NOT NULL DEFAULT ARRAY []::TEXT[],
CHECK (starts_at IS NULL OR ends_at IS NULL OR starts_at < ends_at),
CHECK (uss_requested_ovn != ''),
CHECK (
array_position(past_ovns, NULL) IS NULL AND
array_position(past_ovns, '') IS NULL AND
array_position(past_ovns, uss_requested_ovn) IS NULL
)
);
CREATE INDEX so_owner_idx ON scd_operations (owner);
CREATE INDEX so_altitude_lower_idx ON scd_operations (altitude_lower);
CREATE INDEX so_altitude_upper_idx ON scd_operations (altitude_upper);
CREATE INDEX so_starts_at_idx ON scd_operations (starts_at);
CREATE INDEX so_ends_at_idx ON scd_operations (ends_at);
CREATE INDEX so_updated_at_idx ON scd_operations (updated_at);
CREATE INDEX so_subscription_id_idx ON scd_operations (subscription_id);
CREATE INDEX so_cells_idx ON scd_operations USING ybgin (cells);

CREATE TABLE IF NOT EXISTS scd_constraints (
id UUID PRIMARY KEY,
owner TEXT NOT NULL,
version INT4 NOT NULL DEFAULT 0,
url TEXT NOT NULL,
altitude_lower REAL,
altitude_upper REAL,
starts_at TIMESTAMPTZ,
ends_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
cells BIGINT[] NOT NULL CHECK (array_length(cells, 1) IS NOT NULL),
CHECK (starts_at IS NULL OR ends_at IS NULL OR starts_at < ends_at)
);
CREATE INDEX sc_owner_idx ON scd_constraints (owner);
CREATE INDEX sc_starts_at_idx ON scd_constraints (starts_at);
CREATE INDEX sc_ends_at_idx ON scd_constraints (ends_at);
CREATE INDEX sc_cells_idx ON scd_constraints USING ybgin (cells);


CREATE TABLE IF NOT EXISTS scd_uss_availability (
id TEXT PRIMARY KEY,
availability TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);

CREATE TABLE IF NOT EXISTS schema_versions (
onerow_enforcer bool PRIMARY KEY DEFAULT TRUE CHECK(onerow_enforcer),
schema_version TEXT NOT NULL
);

INSERT INTO schema_versions (schema_version) VALUES ('v1.0.0');
26 changes: 26 additions & 0 deletions build/dev/docker-compose_dss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ services:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-rid-bootstrapper-ybdb:
build:
context: ../..
dockerfile: Dockerfile
image: interuss-local/dss
command: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/rid --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-scd-bootstrapper-ybdb:
build:
context: ../..
dockerfile: Dockerfile
image: interuss-local/dss
entrypoint: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/scd --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-rid-bootstrapper:
build:
context: ../..
Expand Down
80 changes: 55 additions & 25 deletions cmds/db-manager/migration/migrate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package migration

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -73,26 +74,28 @@ func migrate(cmd *cobra.Command, _ []string) error {
}
}

// Connect to database server
connectParameters := crdbflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = "postgres" // Use an initial database that is known to always be present
ds, err := datastore.Dial(ctx, connectParameters)
sysDbName := "postgres" // Use an initial database that is known to always be present
ds, err := connectTo(ctx, sysDbName)
if err != nil {
return fmt.Errorf("failed to connect to database with %+v: %w", connectParameters, err)
return fmt.Errorf("failed to connect to database %s: %w", sysDbName, err)
}
defer func() {
ds.Pool.Close()
}()

log.Printf("CRDB server version: %s", ds.Version.SemVer.String())
log.Printf("Datastore server type and version: %s@%s", ds.Version.Type, ds.Version.SemVer.String())

var (
isCockroach = ds.Version.Type == datastore.CockroachDB
isYugabyte = ds.Version.Type == datastore.Yugabyte
mickmis marked this conversation as resolved.
Show resolved Hide resolved
)

// Make sure specified database exists
exists, err := ds.DatabaseExists(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to check whether database %s exists: %w", dbName, err)
}
if !exists && dbName == "rid" {
if isCockroach && !exists && dbName == "rid" {
// In the special case of rid, the database was previously named defaultdb
log.Printf("Database %s does not exist; checking for older \"defaultdb\" database", dbName)
dbName = "defaultdb"
Expand All @@ -103,16 +106,25 @@ func migrate(cmd *cobra.Command, _ []string) error {
}
if !exists {
log.Printf("Database %s does not exist; creating now", dbName)
createDB := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName)
createDB := fmt.Sprintf("CREATE DATABASE %s", dbName)
if _, err := ds.Pool.Exec(ctx, createDB); err != nil {
return fmt.Errorf("failed to create new database %s: %v", dbName, err)
}
} else {
log.Printf("Database %s already exists; reading current state", dbName)
}

// Reconnect to target database
barroco marked this conversation as resolved.
Show resolved Hide resolved
ds2, err := connectTo(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to reconnect to database %s: %w", dbName, err)
}
defer func() {
ds2.Pool.Close()
}()

// Read current schema version of database
currentVersion, err := ds.GetSchemaVersion(ctx, dbName)
currentVersion, err := ds2.GetSchemaVersion(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to get current database version for %s: %w", dbName, err)
}
Expand Down Expand Up @@ -155,29 +167,39 @@ func migrate(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("failed to load SQL content from %s: %e", fullFilePath, err)
}

// Ensure SQL session has implicit transactions disabled for CRDB versions 22.2+
sessionConfigurationSQL := ""
if ds.Version.SemVer.Compare(*semver.New("22.2.0")) >= 0 {
sessionConfigurationSQL = "SET enable_implicit_transaction_for_batch_statements = false;\n"
}
migrationSQL := ""
if isCockroach {
// Ensure SQL session has implicit transactions disabled for CRDB versions 22.2+
sessionConfigurationSQL := ""
if ds2.Version.SemVer.Compare(*semver.New("22.2.0")) >= 0 {
sessionConfigurationSQL = "SET enable_implicit_transaction_for_batch_statements = false;\n"
}

migrationSQL := sessionConfigurationSQL + fmt.Sprintf("USE %s;\n", dbName) + string(rawMigrationSQL)
migrationSQL = sessionConfigurationSQL + fmt.Sprintf("USE %s;\n", dbName) + string(rawMigrationSQL)
}
if isYugabyte {
// Migrations do not require database switch in opposite to CRDB.
migrationSQL = string(rawMigrationSQL)
}

// Execute migration step
if _, err := ds.Pool.Exec(ctx, migrationSQL); err != nil {
if _, err := ds2.Pool.Exec(ctx, migrationSQL); err != nil {
return fmt.Errorf("failed to execute %s migration step %s: %w", dbName, fullFilePath, err)
}

// Update current state
if dbName == "defaultdb" && newVersion.String() == "4.0.0" && newCurrentStepIndex > currentStepIndex {
// RID database changes from `defaultdb` to `rid` when moving up to 4.0.0
dbName = "rid"
}
if dbName == "rid" && currentVersion.String() == "4.0.0" && newCurrentStepIndex < currentStepIndex {
// RID database changes from `rid` to `defaultdb` when moving down from 4.0.0
dbName = "defaultdb"
if isCockroach {
// Update current state for CRDB
if dbName == "defaultdb" && newVersion.String() == "4.0.0" && newCurrentStepIndex > currentStepIndex {
// RID database changes from `defaultdb` to `rid` when moving up to 4.0.0
dbName = "rid"
}
if dbName == "rid" && currentVersion.String() == "4.0.0" && newCurrentStepIndex < currentStepIndex {
// RID database changes from `rid` to `defaultdb` when moving down from 4.0.0
dbName = "defaultdb"
}
}
actualVersion, err := ds.GetSchemaVersion(ctx, dbName)
actualVersion, err := ds2.GetSchemaVersion(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to get current database version for %s: %w", dbName, err)
}
Expand All @@ -192,6 +214,14 @@ func migrate(cmd *cobra.Command, _ []string) error {
return nil
}

func connectTo(ctx context.Context, dbName string) (*datastore.Datastore, error) {
// Connect to database server
connectParameters := crdbflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = dbName
return datastore.Dial(ctx, connectParameters)
}

func enumerateMigrationSteps(path *string) ([]MigrationStep, error) {
steps := make(map[semver.Version]MigrationStep)

Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func initDatastore(ctx context.Context, pool *pgxpool.Pool) (*Datastore, error)
if version.Type == CockroachDB {
return &Datastore{Version: version, Pool: pool}, nil
}
if version.Type == Yugabyte {
return &Datastore{Version: version, Pool: pool}, nil
}
return nil, stacktrace.NewError("%s is not implemented yet", version.Type)
}

Expand Down Expand Up @@ -99,6 +102,9 @@ func (ds *Datastore) GetSchemaVersion(ctx context.Context, dbName string) (*semv
if dbName == "" {
return nil, stacktrace.NewError("GetSchemaVersion was provided with an empty database name")
}
if ds.Version.Type == Yugabyte && dbName != ds.Pool.Config().ConnConfig.Database {
return nil, stacktrace.NewError("Yugabyte do not support switching databases with the same connection. Unable to retrieve schema version for database %s while connected to %s.", dbName, ds.Pool.Config().ConnConfig.Database)
}

var (
checkTableQuery = fmt.Sprintf(`
Expand Down
Loading