Skip to content

Commit

Permalink
fix(mc2mc): get table info for non default schema (#58)
Browse files Browse the repository at this point in the history
fix: get table info for non default schema
  • Loading branch information
deryrahman authored Jan 6, 2025
1 parent 77540fb commit 54e753e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
2 changes: 1 addition & 1 deletion mc2mc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/goto/transformers/mc2mc
go 1.22.3

require (
github.com/aliyun/aliyun-odps-go-sdk v0.4.0
github.com/aliyun/aliyun-odps-go-sdk v0.4.1
github.com/caarlos0/env/v11 v11.3.1
github.com/pkg/errors v0.9.1
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions mc2mc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/aliyun/aliyun-odps-go-sdk v0.3.15 h1:HkWki3g7G0xEAyxSAChqSDxLw8NCl7PF
github.com/aliyun/aliyun-odps-go-sdk v0.3.15/go.mod h1:t/tgF/iN5aAs/gLL7sEI8/qdax4NuFCKEjO3OJbHZqI=
github.com/aliyun/aliyun-odps-go-sdk v0.4.0 h1:QzEjRsMnB+FjTbO33PypiCbFJ4Wba55h3swSoZZVqgM=
github.com/aliyun/aliyun-odps-go-sdk v0.4.0/go.mod h1:h3n3Jy9qCcq9GhKakuF7Y47W1EP71hfTDx8MCEeQYbA=
github.com/aliyun/aliyun-odps-go-sdk v0.4.1 h1:vOzO7tOc2CO5IW4a192m3+fwd65rnaLib0a5AN7IZfY=
github.com/aliyun/aliyun-odps-go-sdk v0.4.1/go.mod h1:h3n3Jy9qCcq9GhKakuF7Y47W1EP71hfTDx8MCEeQYbA=
github.com/aliyun/credentials-go v1.3.10 h1:45Xxrae/evfzQL9V10zL3xX31eqgLWEaIdCoPipOEQA=
github.com/aliyun/credentials-go v1.3.10/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down
47 changes: 33 additions & 14 deletions mc2mc/internal/client/odps.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,8 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
// GetPartitionNames returns the partition names of the given table
// by querying the table schema.
func (c *odpsClient) GetPartitionNames(_ context.Context, tableID string) ([]string, error) {
splittedTableID := strings.Split(tableID, ".")
if len(splittedTableID) != 3 {
return nil, errors.Errorf("invalid tableID (tableID should be in format project.schema.table): %s", tableID)
}
project, schema, name := splittedTableID[0], splittedTableID[1], splittedTableID[2]
table := odps.NewTable(c.client, project, schema, name)
if err := table.Load(); err != nil {
table, err := getTable(c.client, tableID)
if err != nil {
return nil, errors.WithStack(err)
}
var partitionNames []string
Expand All @@ -77,13 +72,8 @@ func (c *odpsClient) GetPartitionNames(_ context.Context, tableID string) ([]str
// GetOrderedColumns returns the ordered column names of the given table
// by querying the table schema.
func (c *odpsClient) GetOrderedColumns(tableID string) ([]string, error) {
splittedTableID := strings.Split(tableID, ".")
if len(splittedTableID) != 3 {
return nil, errors.Errorf("invalid tableID (tableID should be in format project.schema.table): %s", tableID)
}
project, schema, name := splittedTableID[0], splittedTableID[1], splittedTableID[2]
table := odps.NewTable(c.client, project, schema, name)
if err := table.Load(); err != nil {
table, err := getTable(c.client, tableID)
if err != nil {
return nil, errors.WithStack(err)
}
var columnNames []string
Expand Down Expand Up @@ -115,3 +105,32 @@ func addHints(query string) map[string]string {

return nil
}

// getTable returns the table with the given tableID
func getTable(client *odps.Odps, tableID string) (*odps.Table, error) {
// save current project and schema
currProject := client.DefaultProjectName()
currSchema := client.CurrentSchemaName()
defer func() {
// restore current project and schema
client.SetDefaultProjectName(currProject)
client.SetCurrentSchemaName(currSchema)
}()

splittedTableID := strings.Split(tableID, ".")
if len(splittedTableID) != 3 {
return nil, errors.Errorf("invalid tableID (tableID should be in format project.schema.table): %s", tableID)
}
project, schema, name := splittedTableID[0], splittedTableID[1], splittedTableID[2]

// set project and schema to the table
client.SetDefaultProjectName(project)
client.SetCurrentSchemaName(schema)

// get table
table := client.Tables().Get(name)
if err := table.Load(); err != nil {
return nil, errors.WithStack(err)
}
return table, nil
}

0 comments on commit 54e753e

Please sign in to comment.