Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compile multiple query for replace method mc2mc plugin #313

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions core/scheduler/service/job_run_asset_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -34,12 +35,23 @@ func NewJobAssetsCompiler(engine FilesCompiler, logger log.Logger) *JobRunAssets
}

func (c *JobRunAssetsCompiler) CompileJobRunAssets(_ context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error) {
const (
bq2bq = "bq2bq"
mc2mc = "mc2mc"
)
inputFiles := job.Assets
method, ok1 := job.Task.Config["LOAD_METHOD"]
query, ok2 := job.Assets["query.sql"]
// compile assets exclusive only for bq2bq plugin with replace load method and contains query.sql in asset
const bq2bq = "bq2bq"
if ok1 && ok2 && method == "REPLACE" && job.Task.Name == bq2bq {
if job.Task.Name == mc2mc { // mc2mc plugin uses query file path to locate the query file
pathOnAsset, found := strings.CutPrefix(job.Task.Config["QUERY_FILE_PATH"], "/data/in/")
if !found {
c.logger.Error(fmt.Sprintf("error compiling assets: query file path is not valid, %s, expect to have prefix %s, fallback to \"query.sql\"", job.Task.Config["QUERY_FILE_PATH"], "/data/in/"))
} else {
query, ok2 = job.Assets[pathOnAsset]
}
}
// compile assets exclusive only for bq2bq and mc2mc plugin with replace load method and contains query.sql in asset
if ok1 && ok2 && method == "REPLACE" && (job.Task.Name == bq2bq || job.Task.Name == mc2mc) {
// check if task needs to override the compilation behaviour
compiledQuery, err := c.CompileQuery(interval.Start(), interval.End(), query, systemEnvVars)
if err != nil {
Expand Down
Loading