Skip to content

Commit

Permalink
feat: multi query generation for replace load method maxcompute (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman authored Dec 30, 2024
1 parent 9569cd9 commit bcbff22
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions core/scheduler/service/job_run_asset_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -34,12 +35,23 @@ func NewJobAssetsCompiler(engine FilesCompiler, logger log.Logger) *JobRunAssets
}

func (c *JobRunAssetsCompiler) CompileJobRunAssets(_ context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error) {
const (
bq2bq = "bq2bq"
mc2mc = "mc2mc"
)
inputFiles := job.Assets
method, ok1 := job.Task.Config["LOAD_METHOD"]
query, ok2 := job.Assets["query.sql"]
// compile assets exclusive only for bq2bq plugin with replace load method and contains query.sql in asset
const bq2bq = "bq2bq"
if ok1 && ok2 && method == "REPLACE" && job.Task.Name == bq2bq {
if job.Task.Name == mc2mc { // mc2mc plugin uses query file path to locate the query file
pathOnAsset, found := strings.CutPrefix(job.Task.Config["QUERY_FILE_PATH"], "/data/in/")
if !found {
c.logger.Error(fmt.Sprintf("error compiling assets: query file path is not valid, %s, expect to have prefix %s, fallback to \"query.sql\"", job.Task.Config["QUERY_FILE_PATH"], "/data/in/"))
} else {
query, ok2 = job.Assets[pathOnAsset]
}
}
// compile assets exclusive only for bq2bq and mc2mc plugin with replace load method and contains query.sql in asset
if ok1 && ok2 && method == "REPLACE" && (job.Task.Name == bq2bq || job.Task.Name == mc2mc) {
// check if task needs to override the compilation behaviour
compiledQuery, err := c.CompileQuery(interval.Start(), interval.End(), query, systemEnvVars)
if err != nil {
Expand Down

0 comments on commit bcbff22

Please sign in to comment.