From d33865f71c5b102475aaedc77754dd3069ececb8 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 7 Jan 2025 13:06:52 +0800 Subject: [PATCH 1/4] Fix: TAS assignment error Signed-off-by: kerthcet --- pkg/cache/tas_flavor_snapshot.go | 13 +++++++++---- pkg/resources/requests.go | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/cache/tas_flavor_snapshot.go b/pkg/cache/tas_flavor_snapshot.go index 7c0c05de7a..291df316d0 100644 --- a/pkg/cache/tas_flavor_snapshot.go +++ b/pkg/cache/tas_flavor_snapshot.go @@ -256,7 +256,7 @@ func (s *TASFlavorSnapshot) FindTopologyAssignment( sortedLowerDomains := s.sortedDomains(lowerFitDomains) currFitDomain = s.updateCountsToMinimum(sortedLowerDomains, count) } - return s.buildAssignment(currFitDomain), "" + return s.buildAssignment(currFitDomain, requests), "" } func (s *TASFlavorSnapshot) HasLevel(r *kueue.PodSetTopologyRequest) bool { @@ -334,7 +334,7 @@ func (s *TASFlavorSnapshot) updateCountsToMinimum(domains []*domain, count int32 } // buildTopologyAssignmentForLevels build TopologyAssignment for levels starting from levelIdx -func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain, levelIdx int) *kueue.TopologyAssignment { +func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain, levelIdx int, singlePodRequest resources.Requests) *kueue.TopologyAssignment { assignment := &kueue.TopologyAssignment{ Domains: make([]kueue.TopologyDomainAssignment, len(domains)), } @@ -344,11 +344,16 @@ func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain, Values: domain.levelValues[levelIdx:], Count: domain.state, } + var usage resources.Requests + for k, v := range singlePodRequest { + usage[k] = v * int64(domain.state) + } + s.addUsage(domain.id, usage) } return assignment } -func (s *TASFlavorSnapshot) buildAssignment(domains []*domain) *kueue.TopologyAssignment { +func (s *TASFlavorSnapshot) buildAssignment(domains []*domain, singlePodRequest resources.Requests) *kueue.TopologyAssignment { // lex sort domains by their levelValues instead of IDs, as leaves' IDs can only contain the hostname slices.SortFunc(domains, func(a, b *domain) int { return utilslices.OrderStringSlices(a.levelValues, b.levelValues) @@ -358,7 +363,7 @@ func (s *TASFlavorSnapshot) buildAssignment(domains []*domain) *kueue.TopologyAs if s.isLowestLevelNode() { levelIdx = len(s.levelKeys) - 1 } - return s.buildTopologyAssignmentForLevels(domains, levelIdx) + return s.buildTopologyAssignmentForLevels(domains, levelIdx, singlePodRequest) } func (s *TASFlavorSnapshot) lowerLevelDomains(domains []*domain) []*domain { diff --git a/pkg/resources/requests.go b/pkg/resources/requests.go index 2bc8566dd5..2e5b7b42b7 100644 --- a/pkg/resources/requests.go +++ b/pkg/resources/requests.go @@ -104,6 +104,7 @@ func (req Requests) CountIn(capacity Requests) int32 { if !found { return 0 } + // find the minimum count matching all the resource quota. count := int32(capacity / rValue) if result == nil || count < *result { result = ptr.To(count) From 4f611064e7c0bb3da0782605bb691a05964a20db Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 7 Jan 2025 13:11:22 +0800 Subject: [PATCH 2/4] fix nil map Signed-off-by: kerthcet --- pkg/cache/tas_flavor_snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/tas_flavor_snapshot.go b/pkg/cache/tas_flavor_snapshot.go index 291df316d0..fc8d79de98 100644 --- a/pkg/cache/tas_flavor_snapshot.go +++ b/pkg/cache/tas_flavor_snapshot.go @@ -344,7 +344,7 @@ func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain, Values: domain.levelValues[levelIdx:], Count: domain.state, } - var usage resources.Requests + usage := make(resources.Requests) for k, v := range singlePodRequest { usage[k] = v * int64(domain.state) } From 54731b28572b7de4e19bac9020c437ea69ba073d Mon Sep 17 00:00:00 2001 From: kerthcet Date: Mon, 13 Jan 2025 16:12:30 +0800 Subject: [PATCH 3/4] Add test Signed-off-by: kerthcet --- pkg/scheduler/scheduler_test.go | 94 +++++++++++++++++++++++++++++++++ pkg/util/testing/wrappers.go | 23 ++++++-- 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 1afaed1ae5..2ec2284747 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -4486,6 +4486,100 @@ func TestScheduleForTAS(t *testing.T) { }, }, }, + "scheduling workload with multiple PodSets requesting TAS flavor and will succeed": { + nodes: []corev1.Node{ + *testingnode.MakeNode("x1"). + Label("tas-node", "true"). + Label(tasRackLabel, "r1"). + Label(corev1.LabelHostname, "x1"). + StatusAllocatable(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + }). + Ready(). + Obj(), + *testingnode.MakeNode("y1"). + Label("tas-node", "true"). + Label(tasRackLabel, "r1"). + Label(corev1.LabelHostname, "y1"). + StatusAllocatable(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + }). + Ready(). + Obj(), + }, + topologies: []kueuealpha.Topology{defaultTwoLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASTwoLevelFlavor}, + clusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("tas-main"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("tas-default"). + Resource(corev1.ResourceCPU, "16").Obj()). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets( + *utiltesting.MakePodSet("launcher", 1). + RequiredTopologyRequest(corev1.LabelHostname). + Request(corev1.ResourceCPU, "1"). + Obj(), + *utiltesting.MakePodSet("worker", 15). + PreferredTopologyRequest(corev1.LabelHostname). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantNewAssignments: map[string]kueue.Admission{ + "default/foo": *utiltesting.MakeAdmission("tas-main", "launcher", "worker"). + AssignmentWithIndex(0, corev1.ResourceCPU, "tas-default", "1000m"). + AssignmentPodCountWithIndex(0, 1). + TopologyAssignmentWithIndex(0, &kueue.TopologyAssignment{ + Levels: []string{"kubernetes.io/hostname"}, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }). + AssignmentWithIndex(1, corev1.ResourceCPU, "tas-default", "15000m"). + AssignmentPodCountWithIndex(1, 15). + TopologyAssignmentWithIndex(1, &kueue.TopologyAssignment{ + Levels: []string{"kubernetes.io/hostname"}, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 7, + Values: []string{ + "x1", + }, + }, + { + Count: 8, + Values: []string{ + "y1", + }, + }, + }, + }). + Obj(), + }, + eventCmpOpts: []cmp.Option{eventIgnoreMessage}, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "QuotaReserved", + EventType: corev1.EventTypeNormal, + }, + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "Admitted", + EventType: corev1.EventTypeNormal, + }, + }, + }, "scheduling workload when the node for another admitted workload is deleted": { // Here we have the "bar-admitted" workload, which is admitted and // is using the "x1" node, which is deleted. Still, we have the y1 diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 0cf7ef1e13..189b6bfcee 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -548,18 +548,33 @@ func (w *AdmissionWrapper) Obj() *kueue.Admission { } func (w *AdmissionWrapper) Assignment(r corev1.ResourceName, f kueue.ResourceFlavorReference, value string) *AdmissionWrapper { - w.PodSetAssignments[0].Flavors[r] = f - w.PodSetAssignments[0].ResourceUsage[r] = resource.MustParse(value) + w.AssignmentWithIndex(0, r, f, value) return w } func (w *AdmissionWrapper) AssignmentPodCount(value int32) *AdmissionWrapper { - w.PodSetAssignments[0].Count = ptr.To(value) + w.AssignmentPodCountWithIndex(0, value) return w } func (w *AdmissionWrapper) TopologyAssignment(ts *kueue.TopologyAssignment) *AdmissionWrapper { - w.PodSetAssignments[0].TopologyAssignment = ts + w.TopologyAssignmentWithIndex(0, ts) + return w +} + +func (w *AdmissionWrapper) AssignmentWithIndex(index int32, r corev1.ResourceName, f kueue.ResourceFlavorReference, value string) *AdmissionWrapper { + w.PodSetAssignments[index].Flavors[r] = f + w.PodSetAssignments[index].ResourceUsage[r] = resource.MustParse(value) + return w +} + +func (w *AdmissionWrapper) AssignmentPodCountWithIndex(index, value int32) *AdmissionWrapper { + w.PodSetAssignments[index].Count = ptr.To(value) + return w +} + +func (w *AdmissionWrapper) TopologyAssignmentWithIndex(index int32, ts *kueue.TopologyAssignment) *AdmissionWrapper { + w.PodSetAssignments[index].TopologyAssignment = ts return w } From 3b650c2d9f871d8aa7710c0b38da68d3cf89a96c Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 14 Jan 2025 14:22:23 +0800 Subject: [PATCH 4/4] Add another test Signed-off-by: kerthcet --- pkg/cache/tas_flavor_snapshot.go | 4 +- pkg/scheduler/scheduler_test.go | 94 ++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/pkg/cache/tas_flavor_snapshot.go b/pkg/cache/tas_flavor_snapshot.go index fc8d79de98..4edb8d4e5e 100644 --- a/pkg/cache/tas_flavor_snapshot.go +++ b/pkg/cache/tas_flavor_snapshot.go @@ -345,8 +345,8 @@ func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain, Count: domain.state, } usage := make(resources.Requests) - for k, v := range singlePodRequest { - usage[k] = v * int64(domain.state) + for resName, resValue := range singlePodRequest { + usage[resName] = resValue * int64(domain.state) } s.addUsage(domain.id, usage) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 2ec2284747..92ee9aee78 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -4530,6 +4530,100 @@ func TestScheduleForTAS(t *testing.T) { Obj()). Obj(), }, + wantNewAssignments: map[string]kueue.Admission{ + "default/foo": *utiltesting.MakeAdmission("tas-main", "launcher", "worker"). + AssignmentWithIndex(0, corev1.ResourceCPU, "tas-default", "1000m"). + AssignmentPodCountWithIndex(0, 1). + TopologyAssignmentWithIndex(0, &kueue.TopologyAssignment{ + Levels: []string{corev1.LabelHostname}, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }). + AssignmentWithIndex(1, corev1.ResourceCPU, "tas-default", "15000m"). + AssignmentPodCountWithIndex(1, 15). + TopologyAssignmentWithIndex(1, &kueue.TopologyAssignment{ + Levels: []string{corev1.LabelHostname}, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 7, + Values: []string{ + "x1", + }, + }, + { + Count: 8, + Values: []string{ + "y1", + }, + }, + }, + }). + Obj(), + }, + eventCmpOpts: []cmp.Option{eventIgnoreMessage}, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "QuotaReserved", + EventType: corev1.EventTypeNormal, + }, + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "Admitted", + EventType: corev1.EventTypeNormal, + }, + }, + }, + "scheduling workload with multiple PodSets requesting higher level topology": { + nodes: []corev1.Node{ + *testingnode.MakeNode("x1"). + Label("tas-node", "true"). + Label(tasRackLabel, "r1"). + Label(corev1.LabelHostname, "x1"). + StatusAllocatable(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + }). + Ready(). + Obj(), + *testingnode.MakeNode("y1"). + Label("tas-node", "true"). + Label(tasRackLabel, "r1"). + Label(corev1.LabelHostname, "y1"). + StatusAllocatable(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + }). + Ready(). + Obj(), + }, + topologies: []kueuealpha.Topology{defaultTwoLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASTwoLevelFlavor}, + clusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("tas-main"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("tas-default"). + Resource(corev1.ResourceCPU, "16").Obj()). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets( + *utiltesting.MakePodSet("launcher", 1). + RequiredTopologyRequest(tasRackLabel). + Request(corev1.ResourceCPU, "1"). + Obj(), + *utiltesting.MakePodSet("worker", 15). + RequiredTopologyRequest(tasRackLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, wantNewAssignments: map[string]kueue.Admission{ "default/foo": *utiltesting.MakeAdmission("tas-main", "launcher", "worker"). AssignmentWithIndex(0, corev1.ResourceCPU, "tas-default", "1000m").