From a6c2da887d459ad55e990fa7d09588f432fdfd4b Mon Sep 17 00:00:00 2001 From: Chitsing KUI Date: Mon, 26 Feb 2024 23:17:58 +0800 Subject: [PATCH] run worker process in launcher pod (#612) * run worker in launcher pod; fix DCO issue Signed-off-by: kuizhiqing * use ptr.Deref Signed-off-by: kuizhiqing * update manifest Signed-off-by: kuizhiqing * more Deref Signed-off-by: kuizhiqing * create one service for both launcher and worker Signed-off-by: kuizhiqing --------- Signed-off-by: kuizhiqing --- deploy/v2beta1/mpi-operator.yaml | 5 + go.mod | 2 +- go.sum | 4 +- manifests/base/kubeflow.org_mpijobs.yaml | 5 + pkg/apis/kubeflow/v2beta1/swagger.json | 4 + pkg/apis/kubeflow/v2beta1/types.go | 6 + .../kubeflow/v2beta1/zz_generated.deepcopy.go | 5 + .../kubeflow/v2beta1/mpijobspec.go | 9 ++ pkg/controller/mpi_job_controller.go | 68 ++++---- pkg/controller/mpi_job_controller_test.go | 147 ++++++++++++------ sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 1 + .../mpijob/models/v2beta1_mpi_job_spec.py | 30 +++- 12 files changed, 207 insertions(+), 79 deletions(-) diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index 1a6cbf8b5..3b45024ff 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -8190,6 +8190,11 @@ spec: description: MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. type: object + runLauncherAsWorker: + default: false + description: RunLauncherAsWorker indicates whether to run worker process + in launcher Defaults to false. + type: boolean runPolicy: description: RunPolicy encapsulates various runtime policies of the job. diff --git a/go.mod b/go.mod index 4fb934f2f..55361ccd4 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( k8s.io/code-generator v0.27.4 k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f - k8s.io/utils v0.0.0-20230209194617-a36077c30491 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e sigs.k8s.io/controller-runtime v0.15.1 sigs.k8s.io/scheduler-plugins v0.26.7 sigs.k8s.io/structured-merge-diff/v4 v4.2.3 diff --git a/go.sum b/go.sum index d0bb6f575..01dede9ec 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= -k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= -k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0= sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c= diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index e5257f5e5..0864dd8df 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -8167,6 +8167,11 @@ spec: description: MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. type: object + runLauncherAsWorker: + default: false + description: RunLauncherAsWorker indicates whether to run worker process + in launcher Defaults to false. + type: boolean runPolicy: description: RunPolicy encapsulates various runtime policies of the job. diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 283f2125e..8b56d3640 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -156,6 +156,10 @@ "$ref": "#/definitions/v2beta1.ReplicaSpec" } }, + "runLauncherAsWorker": { + "description": "RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false.", + "type": "boolean" + }, "runPolicy": { "description": "RunPolicy encapsulates various runtime policies of the job.", "default": {}, diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 7525a053e..5480d8425 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,6 +154,12 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + // RunLauncherAsWorker indicates whether to run worker process in launcher + // Defaults to false. + // +optional + // +kubebuilder:default:=false + RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"` + // RunPolicy encapsulates various runtime policies of the job. RunPolicy RunPolicy `json:"runPolicy,omitempty"` diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 1036b571b..75ef51aa3 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -163,6 +163,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { *out = new(int32) **out = **in } + if in.RunLauncherAsWorker != nil { + in, out := &in.RunLauncherAsWorker, &out.RunLauncherAsWorker + *out = new(bool) + **out = **in + } in.RunPolicy.DeepCopyInto(&out.RunPolicy) if in.MPIReplicaSpecs != nil { in, out := &in.MPIReplicaSpecs, &out.MPIReplicaSpecs diff --git a/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go b/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go index 372c332c2..c569d5330 100644 --- a/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go +++ b/pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go @@ -24,6 +24,7 @@ import ( // with apply. type MPIJobSpecApplyConfiguration struct { SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` + RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"` RunPolicy *RunPolicyApplyConfiguration `json:"runPolicy,omitempty"` MPIReplicaSpecs map[kubeflowv2beta1.MPIReplicaType]*kubeflowv2beta1.ReplicaSpec `json:"mpiReplicaSpecs,omitempty"` SSHAuthMountPath *string `json:"sshAuthMountPath,omitempty"` @@ -45,6 +46,14 @@ func (b *MPIJobSpecApplyConfiguration) WithSlotsPerWorker(value int32) *MPIJobSp return b } +// WithRunLauncherAsWorker sets the RunLauncherAsWorker field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the RunLauncherAsWorker field is set to the value of the last call. +func (b *MPIJobSpecApplyConfiguration) WithRunLauncherAsWorker(value bool) *MPIJobSpecApplyConfiguration { + b.RunLauncherAsWorker = &value + return b +} + // WithRunPolicy sets the RunPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the RunPolicy field is set to the value of the last call. diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index d6b2ffc16..8f19a89ae 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -54,6 +54,7 @@ import ( "k8s.io/klog" "k8s.io/utils/clock" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" @@ -630,7 +631,7 @@ func (c *MPIJobController) syncHandler(key string) error { // We're done if the launcher either succeeded or failed. done := launcher != nil && isJobFinished(launcher) if !done { - _, err := c.getOrCreateService(mpiJob, newWorkersService(mpiJob)) + _, err := c.getOrCreateService(mpiJob, newJobService(mpiJob)) if err != nil { return fmt.Errorf("getting or creating Service to front workers: %w", err) } @@ -656,16 +657,6 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || - mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { - // The Intel and MPICH implementations require workers to communicate with the - // launcher through its hostname. For that, we create a Service which - // has the same name as the launcher's hostname. - _, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob)) - if err != nil { - return fmt.Errorf("getting or creating Service to front launcher: %w", err) - } - } if launcher == nil { if mpiJob.Spec.LauncherCreationPolicy == kubeflow.LauncherCreationPolicyAtStartup || c.countReadyWorkerPods(worker) == len(worker) { launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), c.newLauncherJob(mpiJob), metav1.CreateOptions{}) @@ -1279,17 +1270,30 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error { // handleObject can discover the MPIJob resource that 'owns' it. func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap { var buffer bytes.Buffer - workersService := mpiJob.Name + workerSuffix slots := 1 if mpiJob.Spec.SlotsPerWorker != nil { slots = int(*mpiJob.Spec.SlotsPerWorker) } + // note that pod.spec.dnsConfig also affect the svc resolution + // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ + // launcher can be reach with hostname or service name + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { + name := mpiJob.Name + launcherSuffix + switch mpiJob.Spec.MPIImplementation { + case kubeflow.MPIImplementationOpenMPI: + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) + case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) + } + } + for i := 0; i < int(workerReplicas); i++ { + name := workerName(mpiJob, i) switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: - buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH: - buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots)) + buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, mpiJob.Name, mpiJob.Namespace, slots)) } } @@ -1319,22 +1323,27 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo var buffer bytes.Buffer buffer.WriteString("#!/bin/sh\n") - workersService := mpiJob.Name + workerSuffix + + // We don't check if launcher is running here, launcher should always be there or the job failed + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { + name := mpiJob.Name + launcherSuffix + buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace)) + } + for _, p := range runningPods { - buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace)) + buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace)) } configMap.Data[discoverHostsScriptName] = buffer.String() } -// newWorkersService creates a new workers' Service for an MPIJob resource. -func newWorkersService(job *kubeflow.MPIJob) *corev1.Service { - return newService(job, job.Name+workerSuffix, defaultLabels(job.Name, worker)) -} - -// newLauncherService creates a new launcher's Service for an MPIJob resource. -func newLauncherService(job *kubeflow.MPIJob) *corev1.Service { - return newService(job, job.Name+launcherSuffix, defaultLabels(job.Name, launcher)) +// newJobService creates a Service with the same name of Job for both launcher and worker pods +func newJobService(job *kubeflow.MPIJob) *corev1.Service { + labels := map[string]string{ + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: job.Name, + } + return newService(job, job.Name, labels) } func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service { @@ -1416,12 +1425,19 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 } podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index) podTemplate.Spec.Hostname = name - podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. + podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { // Allows resolution of worker hostnames without needing to include the // namespace or cluster domain. podTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet } + // The Intel and MPICH implementations require workers to communicate with the launcher through its hostname. + searche := fmt.Sprintf("%s.%s.svc.cluster.local", mpiJob.Name, mpiJob.Namespace) + if podTemplate.Spec.DNSConfig == nil { + podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{Searches: []string{searche}} + } else { + podTemplate.Spec.DNSConfig.Searches = append(podTemplate.Spec.DNSConfig.Searches, searche) + } setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]) container := &podTemplate.Spec.Containers[0] @@ -1494,7 +1510,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name) } podTemplate.Spec.Hostname = launcherName - podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. + podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { // Allows resolution of worker hostnames without needing to include the // namespace or cluster domain. diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 51c6a8e74..dbc164d3c 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -110,18 +111,6 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef CleanPodPolicy: &cleanPodPolicyAll, }, MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ - kubeflow.MPIReplicaTypeWorker: { - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "foo", - Image: "bar", - }, - }, - }, - }, - }, kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -151,7 +140,22 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJobCommon(name, startTime, completionTime) - mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas + if ptr.Deref(replicas, 0) > 0 { + mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] = + &kubeflow.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + Replicas: replicas, + } + } return mpiJob } @@ -514,7 +518,7 @@ func TestAllResourcesCreated(t *testing.T) { fmjc := f.newFakeMPIJobController() mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.expectCreateServiceAction(newWorkersService(mpiJobCopy)) + f.expectCreateServiceAction(newJobService(mpiJobCopy)) cfgMap := newConfigMap(mpiJobCopy, 5) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.expectCreateConfigMapAction(cfgMap) @@ -526,10 +530,6 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - if implementation == kubeflow.MPIImplementationIntel || - implementation == kubeflow.MPIImplementationMPICH { - f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) - } f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy)) mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")} @@ -664,7 +664,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) { var replicas int32 = 64 mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) f.setUpMPIJob(mpiJob) - f.setUpService(newWorkersService(mpiJob)) + f.setUpService(newJobService(mpiJob)) configMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJob, nil) @@ -685,7 +685,7 @@ func TestWorkerServiceNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - service := newWorkersService(mpiJobCopy) + service := newJobService(mpiJobCopy) service.OwnerReferences = nil f.setUpService(service) @@ -704,7 +704,8 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - service := newWorkersService(mpiJobCopy) + service := newJobService(mpiJobCopy) + service.OwnerReferences = nil f.setUpService(service) configMap := newConfigMap(mpiJobCopy, replicas) secret, err := newSSHAuthSecret(mpiJobCopy) @@ -720,10 +721,6 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) { f.setUpPod(worker) } - service = newLauncherService(mpiJobCopy) - service.OwnerReferences = nil - f.setUpService(service) - f.runExpectError(getKey(mpiJob, t)) } @@ -741,7 +738,7 @@ func TestSecretNotControlledByUs(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -813,7 +810,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) { // expect creation of objects scheme.Scheme.Default(mpiJob) - f.expectCreateServiceAction(newWorkersService(mpiJob)) + f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.expectCreateConfigMapAction(cfgMap) @@ -822,10 +819,6 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - if implementation == kubeflow.MPIImplementationIntel || - implementation == kubeflow.MPIImplementationMPICH { - f.expectCreateServiceAction(newLauncherService(mpiJob)) - } // expect creating of the launcher fmjc := f.newFakeMPIJobController() @@ -887,7 +880,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) { // setup objects scheme.Scheme.Default(mpiJob) - f.setUpService(newWorkersService(mpiJob)) + f.setUpService(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList) @@ -960,7 +953,7 @@ func TestResumeMPIJob(t *testing.T) { // expect creation of objects scheme.Scheme.Default(mpiJob) - f.expectCreateServiceAction(newWorkersService(mpiJob)) + f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas) updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) f.setUpConfigMap(cfgMap) @@ -1016,7 +1009,7 @@ func TestWorkerNotControlledByUs(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1047,7 +1040,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { configMap := newConfigMap(mpiJobCopy, replicas) updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) f.setUpConfigMap(configMap) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1097,7 +1090,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1156,7 +1149,7 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) - f.setUpService(newWorkersService(mpiJobCopy)) + f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { t.Fatalf("Creating SSH auth secret: %v", err) @@ -1251,7 +1244,7 @@ func TestNewLauncherAndWorker(t *testing.T) { }, Spec: corev1.PodSpec{ Hostname: "foo-launcher", - Subdomain: "foo-worker", + Subdomain: "foo", RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { @@ -1305,8 +1298,11 @@ func TestNewLauncherAndWorker(t *testing.T) { }, }, Spec: corev1.PodSpec{ - Hostname: "foo-worker-0", - Subdomain: "foo-worker", + Hostname: "foo-worker-0", + Subdomain: "foo", + DNSConfig: &corev1.PodDNSConfig{ + Searches: []string{"foo.bar.svc.cluster.local"}, + }, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { @@ -1420,7 +1416,7 @@ func TestNewLauncherAndWorker(t *testing.T) { HostNetwork: true, DNSPolicy: corev1.DNSClusterFirstWithHostNet, Hostname: "bar-launcher", - Subdomain: "bar-worker", + Subdomain: "bar", RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { @@ -1480,10 +1476,13 @@ func TestNewLauncherAndWorker(t *testing.T) { }, }, Spec: corev1.PodSpec{ - HostNetwork: true, - DNSPolicy: corev1.DNSClusterFirstWithHostNet, - Hostname: "bar-worker-12", - Subdomain: "bar-worker", + HostNetwork: true, + DNSPolicy: corev1.DNSClusterFirstWithHostNet, + Hostname: "bar-worker-12", + Subdomain: "bar", + DNSConfig: &corev1.PodDNSConfig{ + Searches: []string{"bar.foo.svc.cluster.local"}, + }, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { @@ -1538,7 +1537,57 @@ func TestNewConfigMap(t *testing.T) { workerReplicas int32 wantCM *corev1.ConfigMap }{ - "OpenMPI without slots": { + "OpenMPI without slots, enable launcher as worker": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: pointer.Bool(true), + }, + }, + workerReplicas: 2, + wantCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots-config", + Namespace: "tenant-a", + Labels: map[string]string{ + "app": "openmpi-without-slots", + }, + }, + Data: map[string]string{ + "hostfile": "openmpi-without-slots-launcher.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots.tenant-a.svc slots=1\n", + }, + }, + }, + "OpenMPI without slots, zero explicit workers": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: pointer.Bool(true), + }, + }, + workerReplicas: 0, + wantCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openmpi-without-slots-config", + Namespace: "tenant-a", + Labels: map[string]string{ + "app": "openmpi-without-slots", + }, + }, + Data: map[string]string{ + "hostfile": "openmpi-without-slots-launcher.openmpi-without-slots.tenant-a.svc slots=1\n", + }, + }, + }, + "OpenMPI without slots, disable launcher as worker": { mpiJob: &kubeflow.MPIJob{ ObjectMeta: metav1.ObjectMeta{ Name: "openmpi-without-slots", @@ -1558,7 +1607,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "openmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n", + "hostfile": "openmpi-without-slots-worker-0.openmpi-without-slots.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots.tenant-a.svc slots=1\n", }, }, }, @@ -1583,7 +1632,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "intelmpi-with-slots-worker-0.intelmpi-with-slots-worker.project-x.svc:10\n", + "hostfile": "intelmpi-with-slots-worker-0.intelmpi-with-slots.project-x.svc:10\n", }, }, }, @@ -1608,7 +1657,7 @@ func TestNewConfigMap(t *testing.T) { }, }, Data: map[string]string{ - "hostfile": "mpich-with-slots-worker-0.mpich-with-slots-worker.project-x.svc:10\n", + "hostfile": "mpich-with-slots-worker-0.mpich-with-slots.project-x.svc:10\n", }, }, }, diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index 44d2ef04e..0691e4a4b 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **launcher_creation_policy** | **str** | launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup. | [optional] **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\". | [optional] **mpi_replica_specs** | [**dict(str, V2beta1ReplicaSpec)**](V2beta1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | +**run_launcher_as_worker** | **bool** | RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. | [optional] **run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 68656d763..2d79da369 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -36,6 +36,7 @@ class V2beta1MPIJobSpec(object): 'launcher_creation_policy': 'str', 'mpi_implementation': 'str', 'mpi_replica_specs': 'dict(str, V2beta1ReplicaSpec)', + 'run_launcher_as_worker': 'bool', 'run_policy': 'V2beta1RunPolicy', 'slots_per_worker': 'int', 'ssh_auth_mount_path': 'str' @@ -45,12 +46,13 @@ class V2beta1MPIJobSpec(object): 'launcher_creation_policy': 'launcherCreationPolicy', 'mpi_implementation': 'mpiImplementation', 'mpi_replica_specs': 'mpiReplicaSpecs', + 'run_launcher_as_worker': 'runLauncherAsWorker', 'run_policy': 'runPolicy', 'slots_per_worker': 'slotsPerWorker', 'ssh_auth_mount_path': 'sshAuthMountPath' } - def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_replica_specs=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_replica_specs=None, run_launcher_as_worker=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, local_vars_configuration=None): # noqa: E501 """V2beta1MPIJobSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -59,6 +61,7 @@ def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_r self._launcher_creation_policy = None self._mpi_implementation = None self._mpi_replica_specs = None + self._run_launcher_as_worker = None self._run_policy = None self._slots_per_worker = None self._ssh_auth_mount_path = None @@ -69,6 +72,8 @@ def __init__(self, launcher_creation_policy=None, mpi_implementation=None, mpi_r if mpi_implementation is not None: self.mpi_implementation = mpi_implementation self.mpi_replica_specs = mpi_replica_specs + if run_launcher_as_worker is not None: + self.run_launcher_as_worker = run_launcher_as_worker if run_policy is not None: self.run_policy = run_policy if slots_per_worker is not None: @@ -147,6 +152,29 @@ def mpi_replica_specs(self, mpi_replica_specs): self._mpi_replica_specs = mpi_replica_specs + @property + def run_launcher_as_worker(self): + """Gets the run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + + RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. # noqa: E501 + + :return: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + :rtype: bool + """ + return self._run_launcher_as_worker + + @run_launcher_as_worker.setter + def run_launcher_as_worker(self, run_launcher_as_worker): + """Sets the run_launcher_as_worker of this V2beta1MPIJobSpec. + + RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false. # noqa: E501 + + :param run_launcher_as_worker: The run_launcher_as_worker of this V2beta1MPIJobSpec. # noqa: E501 + :type run_launcher_as_worker: bool + """ + + self._run_launcher_as_worker = run_launcher_as_worker + @property def run_policy(self): """Gets the run_policy of this V2beta1MPIJobSpec. # noqa: E501