Skip to content

Commit

Permalink
Add affinity assignment for TAS
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <[email protected]>
  • Loading branch information
kerthcet committed Jan 8, 2025
1 parent 4f61106 commit 38a3700
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
4 changes: 2 additions & 2 deletions config/components/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: us-central1-docker.pkg.dev/k8s-staging-images/kueue/kueue
newTag: main
newName: registry.cn-shanghai.aliyuncs.com/kerthcet-public/kueue
newTag: "010801"
69 changes: 55 additions & 14 deletions pkg/cache/tas_flavor_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type TASFlavorSnapshot struct {
// domainsPerLevel stores the static tree information
domainsPerLevel []domainByID

// affinityDomains stores the former assigned domains of the workload.
affinityDomains []*domain

// tolerations represents the list of tolerations defined for the resource flavor
tolerations []corev1.Toleration
}
Expand All @@ -123,6 +126,7 @@ func newTASFlavorSnapshot(log logr.Logger, topologyName kueue.TopologyReference,
domains: make(domainByID),
roots: make(domainByID),
domainsPerLevel: domainsPerLevel,
affinityDomains: make([]*domain, 0),
}
return snapshot
}
Expand Down Expand Up @@ -220,10 +224,11 @@ func (s *TASFlavorSnapshot) addUsage(domainID utiltas.TopologyDomainID, usage re
//
// Phase 2:
//
// a) select the domain at requested level with count >= requestedCount
// b) traverse the structure down level-by-level optimizing the number of used
// domains at each level
// c) build the assignment for the lowest level in the hierarchy
// a) pre-assign the affinityDomains to colocate the podsets
// b) select the domain at requested level with count >= requestedCount
// c) traverse the structure down level-by-level optimizing the number of used
// domains at each level
// d) build the assignment for the lowest level in the hierarchy
func (s *TASFlavorSnapshot) FindTopologyAssignment(
topologyRequest *kueue.PodSetTopologyRequest,
requests resources.Requests,
Expand All @@ -241,21 +246,55 @@ func (s *TASFlavorSnapshot) FindTopologyAssignment(
// phase 1 - determine the number of pods which can fit in each topology domain
s.fillInCounts(requests, append(podSetTolerations, s.tolerations...))

// phase 2a: determine the level at which the assignment is done along with
// phase 2a - pre-assign the affinityDomains to make sure podSets are colocated as much as possible.
var prefilledDomain *domain
var prefilledState int32
if len(s.affinityDomains) > 0 {
newDomains := s.sortedDomains(s.affinityDomains, -1)
// the only affinity domain may have quota left.
prefilledDomain = newDomains[0]
// once the prefilled domain has enough quota, no need to go through the rest of the algorithm.
if prefilledDomain.state >= count {
prefilledDomain.state = count
return s.buildAssignment(newDomains[0:1], requests), ""
}

prefilledState = prefilledDomain.state
// We only need to assign count-prefilledState pods then.
count -= prefilledState

// reset the states of domains contain the count of prefilledDomain.
loopDomain := prefilledDomain
for {
loopDomain.state -= prefilledState
if loopDomain.parent == nil {
break
}
loopDomain = loopDomain.parent
}
}

// phase 2b: determine the level at which the assignment is done along with
// the domains which can accommodate all pods
fitLevelIdx, currFitDomain, reason := s.findLevelWithFitDomains(levelIdx, required, count)
if len(reason) > 0 {
return nil, reason
}

// phase 2b: traverse the tree down level-by-level optimizing the number of
// phase 2c: traverse the tree down level-by-level optimizing the number of
// topology domains at each level
currFitDomain = s.updateCountsToMinimum(currFitDomain, count)
currFitDomain = s.updateDomainCountsToMinimum(currFitDomain, count)
for levelIdx := fitLevelIdx; levelIdx+1 < len(s.domainsPerLevel); levelIdx++ {
lowerFitDomains := s.lowerLevelDomains(currFitDomain)
sortedLowerDomains := s.sortedDomains(lowerFitDomains)
currFitDomain = s.updateCountsToMinimum(sortedLowerDomains, count)
sortedLowerDomains := s.sortedDomains(lowerFitDomains, -1)
currFitDomain = s.updateDomainCountsToMinimum(sortedLowerDomains, count)
}
if prefilledDomain != nil {
prefilledDomain.state = prefilledState
currFitDomain = append([]*domain{prefilledDomain}, currFitDomain...)
}

// phase 2d: build the assignment
return s.buildAssignment(currFitDomain, requests), ""
}

Expand Down Expand Up @@ -291,7 +330,7 @@ func (s *TASFlavorSnapshot) findLevelWithFitDomains(levelIdx int, required bool,
return 0, nil, fmt.Sprintf("no topology domains at level: %s", s.levelKeys[levelIdx])
}
levelDomains := utilmaps.Values(domains)
sortedDomain := s.sortedDomains(levelDomains)
sortedDomain := s.sortedDomains(levelDomains, -1)
topDomain := sortedDomain[0]
if topDomain.state < count {
if required {
Expand All @@ -314,7 +353,7 @@ func (s *TASFlavorSnapshot) findLevelWithFitDomains(levelIdx int, required bool,
return levelIdx, []*domain{topDomain}, ""
}

func (s *TASFlavorSnapshot) updateCountsToMinimum(domains []*domain, count int32) []*domain {
func (s *TASFlavorSnapshot) updateDomainCountsToMinimum(domains []*domain, count int32) []*domain {
result := make([]*domain, 0)
remainingCount := count
for _, domain := range domains {
Expand Down Expand Up @@ -349,6 +388,7 @@ func (s *TASFlavorSnapshot) buildTopologyAssignmentForLevels(domains []*domain,
usage[k] = v * int64(domain.state)
}
s.addUsage(domain.id, usage)
s.affinityDomains = append(s.affinityDomains, domain)
}
return assignment
}
Expand All @@ -374,17 +414,18 @@ func (s *TASFlavorSnapshot) lowerLevelDomains(domains []*domain) []*domain {
return result
}

func (s *TASFlavorSnapshot) sortedDomains(domains []*domain) []*domain {
// if ascend = 1, it's ascend, if ascend = -1, it's descend.
func (s *TASFlavorSnapshot) sortedDomains(domains []*domain, op int32) []*domain {
result := make([]*domain, len(domains))
copy(result, domains)
slices.SortFunc(result, func(a, b *domain) int {
switch {
case a.state == b.state:
return utilslices.OrderStringSlices(a.levelValues, b.levelValues)
case a.state > b.state:
return -1
return 1 * int(op)
default:
return 1
return -1 * int(op)
}
})
return result
Expand Down

0 comments on commit 38a3700

Please sign in to comment.