Skip to content

Commit

Permalink
fix: support datetime sql format for maxcompute (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Dec 17, 2024
1 parent 435dfe3 commit 50f5edf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
5 changes: 3 additions & 2 deletions core/scheduler/service/executor_input_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
SecretsStringToMatch = ".secret."

TimeISOFormat = time.RFC3339
TimeSQLFormat = time.DateTime

// Configuration for system defined variables
configDstart = "DSTART"
Expand Down Expand Up @@ -221,14 +222,14 @@ func getSystemDefinedConfigs(job *scheduler.Job, interval interval.Interval, exe
vars := map[string]string{
configDstart: interval.Start().Format(TimeISOFormat),
configDend: interval.End().Format(TimeISOFormat),
configExecutionTime: executedAt.Format(TimeISOFormat),
configExecutionTime: executedAt.Format(TimeSQLFormat), // TODO: remove this once ali support RFC3339 format
configDestination: job.Destination.String(),
}
// TODO: remove this condition after v1/v2 removal, add to map directly
if job.WindowConfig.GetVersion() == window.NewWindowVersion {
vars[configStartDate] = interval.Start().Format(time.DateOnly)
vars[configEndDate] = interval.End().Format(time.DateOnly)
vars[configExecutionTime] = scheduledAt.Format(TimeISOFormat)
vars[configExecutionTime] = scheduledAt.Format(TimeSQLFormat) // TODO: remove this once ali support RFC3339 format
vars[configScheduleTime] = scheduledAt.Format(TimeISOFormat)
vars[configScheduleDate] = scheduledAt.Format(time.DateOnly)
}
Expand Down
16 changes: 8 additions & 8 deletions core/scheduler/service/executor_input_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestExecutorCompiler(t *testing.T) {
systemDefinedVars := map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
}
taskContext := mock.Anything
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestExecutorCompiler(t *testing.T) {
systemDefinedVars := map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
}
taskContext := mock.Anything
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestExecutorCompiler(t *testing.T) {
Configs: map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
"some.config.compiled": "val.compiled",
},
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestExecutorCompiler(t *testing.T) {
Configs: map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
"some.config.compiled": "val.compiled",
},
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestExecutorCompiler(t *testing.T) {
systemDefinedVars := map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
}
taskContext := mock.Anything
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestExecutorCompiler(t *testing.T) {
Configs: map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
"hook.compiled": "hook.val.compiled",
"JOB_LABELS": "job_id=00000000-0000-0000-0000-000000000000,job_name=job1,namespace=ns1,project=proj1",
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestExecutorCompiler(t *testing.T) {
systemDefinedVars := map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
}
taskContext := mock.Anything
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestExecutorCompiler(t *testing.T) {
systemDefinedVars := map[string]string{
"DSTART": interval.Start().Format(time.RFC3339),
"DEND": interval.End().Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.RFC3339),
"EXECUTION_TIME": executedAt.Format(time.DateTime),
"JOB_DESTINATION": job.Destination.String(),
}
taskContext := mock.Anything
Expand Down

0 comments on commit 50f5edf

Please sign in to comment.