Skip to content

Commit

Permalink
run worker process in launcher pod (#612)
Browse files Browse the repository at this point in the history
* run worker in launcher pod; fix DCO issue

Signed-off-by: kuizhiqing <[email protected]>

* use ptr.Deref

Signed-off-by: kuizhiqing <[email protected]>

* update manifest

Signed-off-by: kuizhiqing <[email protected]>

* more Deref

Signed-off-by: kuizhiqing <[email protected]>

* create one service for both launcher and worker

Signed-off-by: kuizhiqing <[email protected]>

---------

Signed-off-by: kuizhiqing <[email protected]>
  • Loading branch information
kuizhiqing authored Feb 26, 2024
1 parent a1ff84c commit a6c2da8
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 79 deletions.
5 changes: 5 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8190,6 +8190,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
k8s.io/code-generator v0.27.4
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/utils v0.0.0-20230209194617-a36077c30491
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
sigs.k8s.io/controller-runtime v0.15.1
sigs.k8s.io/scheduler-plugins v0.26.7
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg=
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0=
sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c=
Expand Down
5 changes: 5 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8167,6 +8167,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
"$ref": "#/definitions/v2beta1.ReplicaSpec"
}
},
"runLauncherAsWorker": {
"description": "RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false.",
"type": "boolean"
},
"runPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the job.",
"default": {},
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicates whether to run worker process in launcher
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 42 additions & 26 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"

Check failure on line 56 in pkg/controller/mpi_job_controller.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: "k8s.io/utils/pointer" is deprecated: Use functions in k8s.io/utils/ptr instead: ptr.To to obtain a pointer, ptr.Deref to dereference a pointer, ptr.Equal to compare dereferenced pointers. (staticcheck)
"k8s.io/utils/ptr"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

Expand Down Expand Up @@ -630,7 +631,7 @@ func (c *MPIJobController) syncHandler(key string) error {
// We're done if the launcher either succeeded or failed.
done := launcher != nil && isJobFinished(launcher)
if !done {
_, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob))
_, err := c.getOrCreateService(mpiJob, newJobService(mpiJob))
if err != nil {
return fmt.Errorf("getting or creating Service to front workers: %w", err)
}
Expand All @@ -656,16 +657,6 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
_, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob))
if err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
}
}
if launcher == nil {
if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) {
launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{})
Expand Down Expand Up @@ -1279,17 +1270,30 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap {
var buffer bytes.Buffer
workersService := mpiJob.Name + workerSuffix
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) {
name := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
}
}

for i := 0; i < int(workerReplicas); i++ {
name := workerName(mpiJob, i)
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots))
}
}

Expand Down Expand Up @@ -1319,22 +1323,27 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")
workersService := mpiJob.Name + workerSuffix

// We don't check if launcher is running here, launcher should always be there or the job failed
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) {
name := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace))
}

for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace))
}

configMap.Data[discoverHostsScriptName] = buffer.String()
}

// newWorkersService creates a new workers' Service for an MPIJob resource.
func newWorkersService(job *kubeflow.MPIJob) *corev1.Service {
return newService(job, job.Name+workerSuffix, defaultLabels(job.Name, worker))
}

// newLauncherService creates a new launcher's Service for an MPIJob resource.
func newLauncherService(job *kubeflow.MPIJob) *corev1.Service {
return newService(job, job.Name+launcherSuffix, defaultLabels(job.Name, launcher))
// newJobService creates a Service with the same name of Job for both launcher and worker pods
func newJobService(job *kubeflow.MPIJob) *corev1.Service {
labels := map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: job.Name,
}
return newService(job, job.Name, labels)
}

func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service {
Expand Down Expand Up @@ -1416,12 +1425,19 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
}
podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index)
podTemplate.Spec.Hostname = name
podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name.
podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name.
if podTemplate.Spec.HostNetwork {
// Allows resolution of worker hostnames without needing to include the
// namespace or cluster domain.
podTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
}
// The Intel and MPICH implementations require workers to communicate with the launcher through its hostname.
searche := fmt.Sprintf("%s.%s.svc.cluster.local", mpiJob.Name, mpiJob.Namespace)
if podTemplate.Spec.DNSConfig == nil {
podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{Searches: []string{searche}}
} else {
podTemplate.Spec.DNSConfig.Searches = append(podTemplate.Spec.DNSConfig.Searches, searche)
}
setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker])

container := &podTemplate.Spec.Containers[0]
Expand Down Expand Up @@ -1494,7 +1510,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev
c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name)
}
podTemplate.Spec.Hostname = launcherName
podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name.
podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name.
if podTemplate.Spec.HostNetwork {
// Allows resolution of worker hostnames without needing to include the
// namespace or cluster domain.
Expand Down
Loading

0 comments on commit a6c2da8

Please sign in to comment.