Skip to content

Commit

Permalink
feat: add temporary support for adding _partitionvalue field during i…
Browse files Browse the repository at this point in the history
…nsertion
  • Loading branch information
deryrahman committed Nov 4, 2024
1 parent ed0cddf commit 67ef381
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 0 deletions.
12 changes: 12 additions & 0 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Client struct {
appCtx context.Context
logger *slog.Logger
shutdownFns []func() error

// TODO: remove this temporary capability after 15 nov
enablePartitionValue bool
}

func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
Expand Down Expand Up @@ -59,6 +62,9 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
if err != nil {
return errors.WithStack(err)
}
if c.enablePartitionValue {
queryRaw = addPartitionValueColumn(queryRaw)
}

// check if table is partitioned
partitionNames, err := c.OdpsClient.GetPartitionNames(ctx, tableID)
Expand All @@ -82,3 +88,9 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
c.logger.Info("execution done")
return errors.WithStack(err)
}

// TODO: remove this temporary support after 15 nov
func addPartitionValueColumn(rawQuery []byte) []byte {
sanitizeQuery := strings.TrimSuffix(string(rawQuery), ";")
return []byte(fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", sanitizeQuery))
}
7 changes: 7 additions & 0 deletions mc2mc/internal/client/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,10 @@ func SetupLoader(loadMethod string) SetupFn {
return nil
}
}

func EnablePartitionValue(enabled bool) SetupFn {
return func(c *Client) error {
c.enablePartitionValue = enabled
return nil
}
}
4 changes: 4 additions & 0 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Config struct {
OtelCollectorGRPCEndpoint string
JobName string
ScheduledTime string
// TODO: remove this temporary support after 15 nov 2024
DevEnablePartitionValue bool
}

type maxComputeCredentials struct {
Expand All @@ -37,6 +39,8 @@ func NewConfig() (*Config, error) {
OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""),
JobName: getJobName(),
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
// TODO: delete this after 15 nov
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
}
// ali-odps-go-sdk related config
scvAcc := getEnv("SERVICE_ACCOUNT", "")
Expand Down
1 change: 1 addition & 0 deletions mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func mc2mc() error {
client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint, cfg.JobName, cfg.ScheduledTime),
client.SetupODPSClient(cfg.GenOdps()),
client.SetupLoader(cfg.LoadMethod),
client.EnablePartitionValue(cfg.DevEnablePartitionValue),
)
if err != nil {
return errors.WithStack(err)
Expand Down

0 comments on commit 67ef381

Please sign in to comment.