-
Notifications
You must be signed in to change notification settings - Fork 683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support overriding task pod_template via with_overrides #6118
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
#take |
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #9af1d5Actionable Suggestions - 7
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
t.Run("Test_enable-distributed-error-aggregation", func(t *testing.T) { | ||
|
||
t.Run("Override", func(t *testing.T) { | ||
testValue := "1" | ||
|
||
cmdFlags.Set("enable-distributed-error-aggregation", testValue) | ||
if vBool, err := cmdFlags.GetBool("enable-distributed-error-aggregation"); err == nil { | ||
testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.EnableDistributedErrorAggregation) | ||
|
||
} else { | ||
assert.FailNow(t, err.Error()) | ||
} | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding test cases for both true
and false
values for the enable-distributed-error-aggregation
flag to ensure proper boolean parsing behavior.
Code suggestion
Check the AI-generated fix before applying
@@ -382,14 +382,40 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) {
t.Run("Test_enable-distributed-error-aggregation", func(t *testing.T) {
t.Run("Override with 1", func(t *testing.T) {
testValue := "1"
cmdFlags.Set("enable-distributed-error-aggregation", testValue)
if vBool, err := cmdFlags.GetBool("enable-distributed-error-aggregation"); err == nil {
testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.EnableDistributedErrorAggregation)
} else {
assert.FailNow(t, err.Error())
}
})
t.Run("Override with true", func(t *testing.T) {
testValue := "true"
cmdFlags.Set("enable-distributed-error-aggregation", testValue)
if vBool, err := cmdFlags.GetBool("enable-distributed-error-aggregation"); err == nil {
testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.EnableDistributedErrorAggregation)
} else {
assert.FailNow(t, err.Error())
}
})
t.Run("Override with false", func(t *testing.T) {
testValue := "false"
cmdFlags.Set("enable-distributed-error-aggregation", testValue)
if vBool, err := cmdFlags.GetBool("enable-distributed-error-aggregation"); err == nil {
testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.EnableDistributedErrorAggregation)
} else {
assert.FailNow(t, err.Error())
}
})
})
Code Review Run #9af1d5
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flyteidl/gen/pb-js/flyteidl.d.ts
Outdated
@@ -6953,6 +6959,9 @@ export namespace flyteidl { | |||
|
|||
/** K8sPod dataConfig */ | |||
dataConfig?: (flyteidl.core.IDataLoadingConfig|null); | |||
|
|||
/** K8sPod primaryContainerName */ | |||
primaryContainerName?: (string|null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making primaryContainerName
required instead of optional since it appears to be a critical field for K8sPod configuration. The interface shows it as optional with ?
but the class implementation has it as required. A similar issue was also found in flyteidl/gen/pb-js/flyteidl.d.ts (line 6985-6986).
Code suggestion
Check the AI-generated fix before applying
primaryContainerName?: (string|null); | |
primaryContainerName: string; |
Code Review Run #9af1d5
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if overrides.GetPodTemplate() != nil { | ||
podtemplate = overrides.GetPodTemplate() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for podtemplate
before assignment. The current code directly assigns the pod template without any validation which could lead to issues if an invalid template is provided.
Code suggestion
Check the AI-generated fix before applying
if overrides.GetPodTemplate() != nil { | |
podtemplate = overrides.GetPodTemplate() | |
} | |
if template := overrides.GetPodTemplate(); template != nil { | |
if err := validatePodTemplate(template); err != nil { | |
return nil, err | |
} | |
podtemplate = template | |
} |
Code Review Run #9af1d5
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #81015aActionable Suggestions - 0Additional Suggestions - 10
Review Details
|
// Override for the pod template used by task pods | ||
K8sPod pod_template = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Override for the pod template used by task pods | |
K8sPod pod_template = 4; | |
// Override for the pod template used by task pods. | |
//+optional | |
K8sPod pod_template = 4; |
@@ -286,6 +286,13 @@ func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v | |||
*c, | |||
}, | |||
} | |||
if tCtx.TaskExecutionMetadata().GetOverrides().GetPodTemplate() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we create a variable for podTemplate here? like
podTemplate := tCtx.TaskExecutionMetadata().GetOverrides().GetPodTemplate
if podTemplate != nil {
...
}
if podtemplate.Metadata.Labels != nil { | ||
mergeMapInto(podtemplate.Metadata.Labels, objectMeta.Labels) | ||
} | ||
var podspec_override *v1.PodSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var podspec_override *v1.PodSpec | |
var podSpecOverride *v1.PodSpec |
@@ -446,6 +453,25 @@ func ApplyContainerImageOverride(podSpec *v1.PodSpec, containerImage string, pri | |||
} | |||
} | |||
|
|||
func ApplyPodTemplateOverride(podSpec *v1.PodSpec, objectMeta metav1.ObjectMeta, podtemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func ApplyPodTemplateOverride(podSpec *v1.PodSpec, objectMeta metav1.ObjectMeta, podtemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error) { | |
func ApplyPodTemplateOverride(podSpec *v1.PodSpec, objectMeta metav1.ObjectMeta, podTemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error) { |
Signed-off-by: Nelson Chen <[email protected]>
@pingsutw Hi Kevin, I have added a unit test in pod_helper_test.go and fixed some typos that you mentioned. |
Code Review Agent Run #7e1782Actionable Suggestions - 1
Review Details
|
t.Run("Override pod template", func(t *testing.T) { | ||
taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{ | ||
Requests: v1.ResourceList{ | ||
v1.ResourceCPU: resource.MustParse("1024m"), | ||
}}, nil, "", &core.K8SPod{ | ||
PrimaryContainerName: "foo", | ||
PodSpec: podSpecStruct, | ||
Metadata: metadata, | ||
}) | ||
p, m, _, err := ToK8sPodSpec(context.TODO(), taskContext) | ||
assert.NoError(t, err) | ||
assert.Equal(t, "a", m.Labels["l"]) | ||
assert.Equal(t, "b", m.Annotations["a"]) | ||
assert.Equal(t, "foo:latest", p.Containers[0].Image) | ||
assert.Equal(t, "foo", p.Containers[0].Name) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding test cases to verify error scenarios and edge cases in TestPodTemplateOverride
. For example, testing with invalid pod specs, missing container names, or empty metadata.
Code suggestion
Check the AI-generated fix before applying
t.Run("Override pod template", func(t *testing.T) { | |
taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{ | |
Requests: v1.ResourceList{ | |
v1.ResourceCPU: resource.MustParse("1024m"), | |
}}, nil, "", &core.K8SPod{ | |
PrimaryContainerName: "foo", | |
PodSpec: podSpecStruct, | |
Metadata: metadata, | |
}) | |
p, m, _, err := ToK8sPodSpec(context.TODO(), taskContext) | |
assert.NoError(t, err) | |
assert.Equal(t, "a", m.Labels["l"]) | |
assert.Equal(t, "b", m.Annotations["a"]) | |
assert.Equal(t, "foo:latest", p.Containers[0].Image) | |
assert.Equal(t, "foo", p.Containers[0].Name) | |
}) | |
t.Run("Override pod template", func(t *testing.T) { | |
taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{ | |
Requests: v1.ResourceList{ | |
v1.ResourceCPU: resource.MustParse("1024m"), | |
}}, nil, "", &core.K8SPod{ | |
PrimaryContainerName: "foo", | |
PodSpec: podSpecStruct, | |
Metadata: metadata, | |
}) | |
p, m, _, err := ToK8sPodSpec(context.TODO(), taskContext) | |
assert.NoError(t, err) | |
assert.Equal(t, "a", m.Labels["l"]) | |
assert.Equal(t, "b", m.Annotations["a"]) | |
assert.Equal(t, "foo:latest", p.Containers[0].Image) | |
assert.Equal(t, "foo", p.Containers[0].Name) | |
}) | |
t.Run("Invalid pod spec", func(t *testing.T) { | |
taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{ | |
Requests: v1.ResourceList{ | |
v1.ResourceCPU: resource.MustParse("1024m"), | |
}}, nil, "", &core.K8SPod{ | |
PrimaryContainerName: "foo", | |
PodSpec: nil, | |
Metadata: metadata, | |
}) | |
_, _, _, err := ToK8sPodSpec(context.TODO(), taskContext) | |
assert.Error(t, err) | |
}) |
Code Review Run #7e1782
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Nelson Chen <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6118 +/- ##
==========================================
- Coverage 37.06% 34.42% -2.65%
==========================================
Files 1318 1097 -221
Lines 132644 114708 -17936
==========================================
- Hits 49167 39485 -9682
+ Misses 79228 71855 -7373
+ Partials 4249 3368 -881
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #1518a4Actionable Suggestions - 0Additional Suggestions - 1
Review Details
|
ow := &pluginsIOMock.OutputWriter{} | ||
ow.OnGetOutputPrefixPath().Return("") | ||
ow.OnGetRawOutputPrefix().Return("") | ||
ow.OnGetCheckpointPrefix().Return("/checkpoint") | ||
ow.OnGetPreviousCheckpointsPrefix().Return("/prev") | ||
|
||
tCtx := &pluginsCoreMock.TaskExecutionContext{} | ||
tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r, rm, containerImage)) | ||
tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r, rm, containerImage, podtemplate)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r, rm, containerImage, podtemplate)) | |
tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r, rm, containerImage, podTemplate)) |
@@ -82,15 +84,15 @@ func dummyInputReader() io.InputReader { | |||
return inputReader | |||
} | |||
|
|||
func dummyExecContext(taskTemplate *core.TaskTemplate, r *v1.ResourceRequirements, rm *core.ExtendedResources, containerImage string) pluginsCore.TaskExecutionContext { | |||
func dummyExecContext(taskTemplate *core.TaskTemplate, r *v1.ResourceRequirements, rm *core.ExtendedResources, containerImage string, podtemplate *core.K8SPod) pluginsCore.TaskExecutionContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func dummyExecContext(taskTemplate *core.TaskTemplate, r *v1.ResourceRequirements, rm *core.ExtendedResources, containerImage string, podtemplate *core.K8SPod) pluginsCore.TaskExecutionContext { | |
func dummyExecContext(taskTemplate *core.TaskTemplate, r *v1.ResourceRequirements, rm *core.ExtendedResources, containerImage string, podTemplate *core.K8SPod) pluginsCore.TaskExecutionContext { |
) | ||
|
||
func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedResources *core.ExtendedResources, containerImage string) pluginsCore.TaskExecutionMetadata { | ||
func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedResources *core.ExtendedResources, containerImage string, podtemplate *core.K8SPod) pluginsCore.TaskExecutionMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedResources *core.ExtendedResources, containerImage string, podtemplate *core.K8SPod) pluginsCore.TaskExecutionMetadata { | |
func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, extendedResources *core.ExtendedResources, containerImage string, podTemplate *core.K8SPod) pluginsCore.TaskExecutionMetadata { |
return nil, objectMeta, err | ||
} | ||
|
||
return podspec_override, objectMeta, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return podspec_override, objectMeta, nil | |
return podSpecOverride, objectMeta, nil |
mergeMapInto(podTemplate.Metadata.Labels, objectMeta.Labels) | ||
} | ||
var podspec_override *v1.PodSpec | ||
err := utils.UnmarshalStructToObj(podTemplate.PodSpec, &podspec_override) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := utils.UnmarshalStructToObj(podTemplate.PodSpec, &podspec_override) | |
err := utils.UnmarshalStructToObj(podTemplate.PodSpec, &podSpecOverride) |
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run #945160Actionable Suggestions - 4
Review Details
|
ownerRef metav1.OwnerReference, | ||
annotationsOverride map[string]string, | ||
labelsOverride map[string]string) pluginsCore.TaskExecutionMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider updating the function signature to use a struct parameter instead of multiple parameters. The current signature with 8 parameters could be simplified using a configuration struct.
Code suggestion
Check the AI-generated fix before applying
-func getMockTaskExecutionMetadataCustom(
- tid string,
- ns string,
- annotations map[string]string,
- labels map[string]string,
- ownerRef metav1.OwnerReference,
- annotationsOverride map[string]string,
- labelsOverride map[string]string) pluginsCore.TaskExecutionMetadata {
+func getMockTaskExecutionMetadataCustom(config TaskMetadataConfig) pluginsCore.TaskExecutionMetadata {
Code Review Run #945160
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()), taskCtx.GetOverrides().GetPodTemplate().GetMetadata().GetAnnotations())) | ||
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()), taskCtx.GetOverrides().GetPodTemplate().GetMetadata().GetLabels())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential nil pointer dereference when accessing pod template metadata. The code assumes taskCtx.GetOverrides().GetPodTemplate()
returns non-nil values.
Code suggestion
Check the AI-generated fix before applying
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()), taskCtx.GetOverrides().GetPodTemplate().GetMetadata().GetAnnotations())) | |
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()), taskCtx.GetOverrides().GetPodTemplate().GetMetadata().GetLabels())) | |
podTemplateAnnotations := make(map[string]string) | |
podTemplateLabels := make(map[string]string) | |
if overrides := taskCtx.GetOverrides(); overrides != nil { | |
if podTemplate := overrides.GetPodTemplate(); podTemplate != nil { | |
if metadata := podTemplate.GetMetadata(); metadata != nil { | |
podTemplateAnnotations = metadata.GetAnnotations() | |
podTemplateLabels = metadata.GetLabels() | |
} | |
} | |
} | |
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()), podTemplateAnnotations)) | |
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()), podTemplateLabels)) |
Code Review Run #945160
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
func ApplyPodTemplateOverride(podTemplate *core.K8SPod) (*v1.PodSpec, error) { | ||
// metadata override will be implemented in plugin_manager.go | ||
var podSpecOverride *v1.PodSpec | ||
err := utils.UnmarshalStructToObj(podTemplate.GetPodSpec(), &podSpecOverride) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for podTemplate
being nil before accessing GetPodSpec()
Code suggestion
Check the AI-generated fix before applying
func ApplyPodTemplateOverride(podTemplate *core.K8SPod) (*v1.PodSpec, error) { | |
// metadata override will be implemented in plugin_manager.go | |
var podSpecOverride *v1.PodSpec | |
err := utils.UnmarshalStructToObj(podTemplate.GetPodSpec(), &podSpecOverride) | |
func ApplyPodTemplateOverride(podTemplate *core.K8SPod) (*v1.PodSpec, error) { | |
if podTemplate == nil { | |
return nil, fmt.Errorf("nil pod template provided") | |
} | |
// metadata override will be implemented in plugin_manager.go | |
var podSpecOverride *v1.PodSpec | |
err := utils.UnmarshalStructToObj(podTemplate.GetPodSpec(), &podSpecOverride) |
Code Review Run #945160
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Nelson Chen <[email protected]>
Code Review Agent Run Status
|
Tracking issue
Related to #5683
Why are the changes needed?
If we can support pod_template in with_overrides, this would reduce a lot of toil since we can supply pod templates in a central location and override downstream tasks, similar to how we can do so for resources.
What changes were proposed in this pull request?
We can use with_override() to override podtemplate, just like resources.
How was this patch tested?
Excute a workflow and using with_override(pod_template=PodTemplate(xxx)) to override the default podtemplate
Setup process
I ran flyte on my local machine and tested my code with this workflow and task:
Screenshots
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR implements pod template override functionality with with_overrides() method across multiple languages, enabling centralized pod template configuration. The implementation includes protobuf definitions, security enhancements through configurable security contexts, and pod template metadata handling. The changes improve email notification system with retry mechanisms, add container port naming support, and enhance array node event handling in Flytepropeller. The refactoring allows users to manage pod templates from a central location, similar to resource overrides.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5