Skip to content

Commit

Permalink
Update handling of unlogged tables as per 2024.2 (#2074)
Browse files Browse the repository at this point in the history
* Specify MinVersionFixedIn = 2024.2.0.0
* If issue is detected, but filtered out because of target-db-version, add a note in assessment report.
* Propagate NOTICE client messages to stdout in import-schema (with the exception of the ALTER TABLE "table rewrite will cause inconsistencies" notice)
* Moved some functions from importData.go to importSchemaYugabyteDB.go
* Upgrade pgx from v4 to v5 in importSchemaYUgabyteDB.go
  • Loading branch information
makalaaneesh authored Dec 16, 2024
1 parent 13a678b commit dd61428
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 194 deletions.
11 changes: 0 additions & 11 deletions migtests/tests/analyze-schema/expected_issues.json
Original file line number Diff line number Diff line change
Expand Up @@ -534,17 +534,6 @@
"GH": "https://github.com/yugabyte/yugabyte-db/issues/10652",
"MinimumVersionsFixedIn": null
},
{
"IssueType": "unsupported_features",
"ObjectType": "TABLE",
"ObjectName": "tbl_unlogged",
"Reason": "UNLOGGED tables are not supported yet.",
"SqlStatement": "CREATE UNLOGGED TABLE tbl_unlogged (id int, val text);",
"Suggestion": "Remove UNLOGGED keyword to make it work",
"GH": "https://github.com/yugabyte/yugabyte-db/issues/1129/",
"DocsLink":"https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unlogged-table-is-not-supported",
"MinimumVersionsFixedIn": null
},
{
"IssueType": "unsupported_features",
"ObjectType": "VIEW",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,7 @@
},
{
"FeatureName": "Unlogged tables",
"Objects": [
{
"ObjectName": "public.tbl_unlogged",
"SqlStatement": "CREATE UNLOGGED TABLE public.tbl_unlogged (\n id integer,\n val text\n);"
},
{
"ObjectName": "schema2.tbl_unlogged",
"SqlStatement": "CREATE UNLOGGED TABLE schema2.tbl_unlogged (\n id integer,\n val text\n);"
}
],
"DocsLink": "https://docs.yugabyte.com/preview/yugabyte-voyager/known-issues/postgresql/#unlogged-table-is-not-supported",
"Objects": [],
"MinimumVersionsFixedIn": null
},
{
Expand Down Expand Up @@ -1989,7 +1979,9 @@
"SizeInBytes": 8192
}
],
"Notes": null,
"Notes": [
"There are some Unlogged tables in the schema. They will be created as regular LOGGED tables in YugabyteDB as unlogged tables are not supported."
],
"MigrationCaveats": [
{
"FeatureName": "Alter partitioned tables to add Primary Key",
Expand Down
8 changes: 7 additions & 1 deletion yb-voyager/cmd/assessMigrationCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,8 @@ To manually modify the schema, please refer: <a class="highlight-link" href="htt

ORACLE_UNSUPPPORTED_PARTITIONING = `Reference and System Partitioned tables are created as normal tables, but are not considered for target cluster sizing recommendations.`

GIN_INDEXES = `There are some BITMAP indexes present in the schema that will get converted to GIN indexes, but GIN indexes are partially supported in YugabyteDB as mentioned in <a class="highlight-link" href="https://github.com/yugabyte/yugabyte-db/issues/7850">https://github.com/yugabyte/yugabyte-db/issues/7850</a> so take a look and modify them if not supported.`
GIN_INDEXES = `There are some BITMAP indexes present in the schema that will get converted to GIN indexes, but GIN indexes are partially supported in YugabyteDB as mentioned in <a class="highlight-link" href="https://github.com/yugabyte/yugabyte-db/issues/7850">https://github.com/yugabyte/yugabyte-db/issues/7850</a> so take a look and modify them if not supported.`
UNLOGGED_TABLE_NOTE = `There are some Unlogged tables in the schema. They will be created as regular LOGGED tables in YugabyteDB as unlogged tables are not supported.`
)

const FOREIGN_TABLE_NOTE = `There are some Foreign tables in the schema, but during the export schema phase, exported schema does not include the SERVER and USER MAPPING objects. Therefore, you must manually create these objects before import schema. For more information on each of them, run analyze-schema. `
Expand All @@ -1363,7 +1364,12 @@ func addNotesToAssessmentReport() {
}
}
}
case POSTGRESQL:
if parserIssueDetector.IsUnloggedTablesIssueFiltered {
assessmentReport.Notes = append(assessmentReport.Notes, UNLOGGED_TABLE_NOTE)
}
}

}

func addMigrationCaveatsToAssessmentReport(unsupportedDataTypesForLiveMigration []utils.TableColumnsDataTypes, unsupportedDataTypesForLiveMigrationWithFForFB []utils.TableColumnsDataTypes) {
Expand Down
118 changes: 0 additions & 118 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package cmd

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -29,7 +28,6 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/fatih/color"
"github.com/jackc/pgx/v4"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"github.com/sourcegraph/conc/pool"
Expand Down Expand Up @@ -1216,65 +1214,6 @@ func importBatch(batch *Batch, importBatchArgsProto *tgtdb.ImportBatchArgs) {
}
}

func newTargetConn() *pgx.Conn {
conn, err := pgx.Connect(context.Background(), tconf.GetConnectionUri())
if err != nil {
utils.WaitChannel <- 1
<-utils.WaitChannel
utils.ErrExit("connect to target db: %s", err)
}

setTargetSchema(conn)

if sourceDBType == ORACLE && enableOrafce {
setOrafceSearchPath(conn)
}

return conn
}

// TODO: Eventually get rid of this function in favour of TargetYugabyteDB.setTargetSchema().
func setTargetSchema(conn *pgx.Conn) {
if sourceDBType == POSTGRESQL || tconf.Schema == YUGABYTEDB_DEFAULT_SCHEMA {
// For PG, schema name is already included in the object name.
// No need to set schema if importing in the default schema.
return
}
checkSchemaExistsQuery := fmt.Sprintf("SELECT count(schema_name) FROM information_schema.schemata WHERE schema_name = '%s'", tconf.Schema)
var cntSchemaName int

if err := conn.QueryRow(context.Background(), checkSchemaExistsQuery).Scan(&cntSchemaName); err != nil {
utils.ErrExit("run query %q on target %q to check schema exists: %s", checkSchemaExistsQuery, tconf.Host, err)
} else if cntSchemaName == 0 {
utils.ErrExit("schema '%s' does not exist in target", tconf.Schema)
}

setSchemaQuery := fmt.Sprintf("SET SCHEMA '%s'", tconf.Schema)
_, err := conn.Exec(context.Background(), setSchemaQuery)
if err != nil {
utils.ErrExit("run query %q on target %q: %s", setSchemaQuery, tconf.Host, err)
}
}

func dropIdx(conn *pgx.Conn, idxName string) error {
dropIdxQuery := fmt.Sprintf("DROP INDEX IF EXISTS %s", idxName)
log.Infof("Dropping index: %q", dropIdxQuery)
_, err := conn.Exec(context.Background(), dropIdxQuery)
if err != nil {
return fmt.Errorf("failed to drop index %q: %w", idxName, err)
}
return nil
}

func setOrafceSearchPath(conn *pgx.Conn) {
// append oracle schema in the search_path for orafce
updateSearchPath := `SELECT set_config('search_path', current_setting('search_path') || ', oracle', false)`
_, err := conn.Exec(context.Background(), updateSearchPath)
if err != nil {
utils.ErrExit("unable to update search_path for orafce extension: %v", err)
}
}

func getIndexName(sqlQuery string, indexName string) (string, error) {
// Return the index name itself if it is aleady qualified with schema name
if len(strings.Split(indexName, ".")) == 2 {
Expand All @@ -1292,63 +1231,6 @@ func getIndexName(sqlQuery string, indexName string) (string, error) {
return "", fmt.Errorf("could not find `ON` keyword in the CREATE INDEX statement")
}

// TODO: need automation tests for this, covering cases like schema(public vs non-public) or case sensitive names
func beforeIndexCreation(sqlInfo sqlInfo, conn **pgx.Conn, objType string) error {
if !strings.Contains(strings.ToUpper(sqlInfo.stmt), "CREATE INDEX") {
return nil
}

fullyQualifiedObjName, err := getIndexName(sqlInfo.stmt, sqlInfo.objName)
if err != nil {
return fmt.Errorf("extract qualified index name from DDL [%v]: %w", sqlInfo.stmt, err)
}
if invalidTargetIndexesCache == nil {
invalidTargetIndexesCache, err = getInvalidIndexes(conn)
if err != nil {
return fmt.Errorf("failed to fetch invalid indexes: %w", err)
}
}

// check index valid or not
if invalidTargetIndexesCache[fullyQualifiedObjName] {
log.Infof("index %q already exists but in invalid state, dropping it", fullyQualifiedObjName)
err = dropIdx(*conn, fullyQualifiedObjName)
if err != nil {
return fmt.Errorf("drop invalid index %q: %w", fullyQualifiedObjName, err)
}
}

// print the index name as index creation takes time and user can see the progress
color.Yellow("creating index %s ...", fullyQualifiedObjName)
return nil
}

func getInvalidIndexes(conn **pgx.Conn) (map[string]bool, error) {
var result = make(map[string]bool)
// NOTE: this shouldn't fetch any predefined indexes of pg_catalog schema (assuming they can't be invalid) or indexes of other successful migrations
query := "SELECT indexrelid::regclass FROM pg_index WHERE indisvalid = false"

rows, err := (*conn).Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("querying invalid indexes: %w", err)
}
defer rows.Close()

for rows.Next() {
var fullyQualifiedIndexName string
err := rows.Scan(&fullyQualifiedIndexName)
if err != nil {
return nil, fmt.Errorf("scanning row for invalid index name: %w", err)
}
// if schema is not provided by catalog table, then it is public schema
if !strings.Contains(fullyQualifiedIndexName, ".") {
fullyQualifiedIndexName = fmt.Sprintf("public.%s", fullyQualifiedIndexName)
}
result[fullyQualifiedIndexName] = true
}
return result, nil
}

// TODO: This function is a duplicate of the one in tgtdb/yb.go. Consolidate the two.
func getTargetSchemaName(tableName string) string {
parts := strings.Split(tableName, ".")
Expand Down
Loading

0 comments on commit dd61428

Please sign in to comment.