From 50f5edf356cfd6564daf40a20f14053fa161982d Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Tue, 17 Dec 2024 17:51:27 +0700 Subject: [PATCH] fix: support datetime sql format for maxcompute (#311) --- .../scheduler/service/executor_input_compiler.go | 5 +++-- .../service/executor_input_compiler_test.go | 16 ++++++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/scheduler/service/executor_input_compiler.go b/core/scheduler/service/executor_input_compiler.go index 3d048aebbe..e8f8e7e3b9 100644 --- a/core/scheduler/service/executor_input_compiler.go +++ b/core/scheduler/service/executor_input_compiler.go @@ -34,6 +34,7 @@ const ( SecretsStringToMatch = ".secret." TimeISOFormat = time.RFC3339 + TimeSQLFormat = time.DateTime // Configuration for system defined variables configDstart = "DSTART" @@ -221,14 +222,14 @@ func getSystemDefinedConfigs(job *scheduler.Job, interval interval.Interval, exe vars := map[string]string{ configDstart: interval.Start().Format(TimeISOFormat), configDend: interval.End().Format(TimeISOFormat), - configExecutionTime: executedAt.Format(TimeISOFormat), + configExecutionTime: executedAt.Format(TimeSQLFormat), // TODO: remove this once ali support RFC3339 format configDestination: job.Destination.String(), } // TODO: remove this condition after v1/v2 removal, add to map directly if job.WindowConfig.GetVersion() == window.NewWindowVersion { vars[configStartDate] = interval.Start().Format(time.DateOnly) vars[configEndDate] = interval.End().Format(time.DateOnly) - vars[configExecutionTime] = scheduledAt.Format(TimeISOFormat) + vars[configExecutionTime] = scheduledAt.Format(TimeSQLFormat) // TODO: remove this once ali support RFC3339 format vars[configScheduleTime] = scheduledAt.Format(TimeISOFormat) vars[configScheduleDate] = scheduledAt.Format(time.DateOnly) } diff --git a/core/scheduler/service/executor_input_compiler_test.go b/core/scheduler/service/executor_input_compiler_test.go index 2fa4cbdbc7..747e4117f0 100644 --- a/core/scheduler/service/executor_input_compiler_test.go +++ b/core/scheduler/service/executor_input_compiler_test.go @@ -147,7 +147,7 @@ func TestExecutorCompiler(t *testing.T) { systemDefinedVars := map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), } taskContext := mock.Anything @@ -216,7 +216,7 @@ func TestExecutorCompiler(t *testing.T) { systemDefinedVars := map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), } taskContext := mock.Anything @@ -273,7 +273,7 @@ func TestExecutorCompiler(t *testing.T) { Configs: map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), "some.config.compiled": "val.compiled", }, @@ -325,7 +325,7 @@ func TestExecutorCompiler(t *testing.T) { Configs: map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), "some.config.compiled": "val.compiled", }, @@ -399,7 +399,7 @@ func TestExecutorCompiler(t *testing.T) { systemDefinedVars := map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), } taskContext := mock.Anything @@ -430,7 +430,7 @@ func TestExecutorCompiler(t *testing.T) { Configs: map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), "hook.compiled": "hook.val.compiled", "JOB_LABELS": "job_id=00000000-0000-0000-0000-000000000000,job_name=job1,namespace=ns1,project=proj1", @@ -498,7 +498,7 @@ func TestExecutorCompiler(t *testing.T) { systemDefinedVars := map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), } taskContext := mock.Anything @@ -570,7 +570,7 @@ func TestExecutorCompiler(t *testing.T) { systemDefinedVars := map[string]string{ "DSTART": interval.Start().Format(time.RFC3339), "DEND": interval.End().Format(time.RFC3339), - "EXECUTION_TIME": executedAt.Format(time.RFC3339), + "EXECUTION_TIME": executedAt.Format(time.DateTime), "JOB_DESTINATION": job.Destination.String(), } taskContext := mock.Anything