From aed7eedaf3846c8fcc30918736622cc5fe21668e Mon Sep 17 00:00:00 2001 From: Irfan Ur Rehman Date: Mon, 26 Dec 2016 14:31:36 +0530 Subject: [PATCH 1/3] [Federation] Wait for control plane pods in kubefed init --- federation/pkg/kubefed/init/init.go | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/federation/pkg/kubefed/init/init.go b/federation/pkg/kubefed/init/init.go index 197328a6026bd..423f7115027ec 100644 --- a/federation/pkg/kubefed/init/init.go +++ b/federation/pkg/kubefed/init/init.go @@ -230,6 +230,23 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman } if !dryRun { + fmt.Fprintf(cmdOut, "Waiting for control plane to come up") + for !podRunning(hostClientset, serverName, initFlags.FederationSystemNamespace) { + _, err := fmt.Fprintf(cmdOut, ".") + if err != nil { + return err + } + //wait indefinite if the pod doesn't show up with correct status + time.Sleep(2 * time.Second) + } + for !podRunning(hostClientset, cmName, initFlags.FederationSystemNamespace) { + _, err := fmt.Fprintf(cmdOut, ".") + if err != nil { + return err + } + //wait indefinite if the pod doesn't show up with correct status + time.Sleep(2 * time.Second) + } return printSuccess(cmdOut, ips, hostnames) } _, err = fmt.Fprintf(cmdOut, "Federation control plane runs (dry run)\n") @@ -576,6 +593,21 @@ func createControllerManager(clientset *client.Clientset, namespace, name, svcNa return clientset.Extensions().Deployments(namespace).Create(dep) } +func podRunning(clientset *client.Clientset, name, nameSpace string) bool { + podList, err := clientset.Core().Pods(nameSpace).List(api.ListOptions{}) + if err != nil { + //Problem in getting pods at this time + return false + } + + for _, pod := range podList.Items { + if strings.Contains(pod.Name, name) && pod.Status.Phase == "Running" { + return true + } + } + return false +} + func printSuccess(cmdOut io.Writer, ips, hostnames []string) error { svcEndpoints := append(ips, hostnames...) _, err := fmt.Fprintf(cmdOut, "Federation API server is running at: %s\n", strings.Join(svcEndpoints, ", ")) From 27f7fca8e06d4be69270c837749f3d3d4199d06c Mon Sep 17 00:00:00 2001 From: Irfan Ur Rehman Date: Mon, 26 Dec 2016 15:59:42 +0530 Subject: [PATCH 2/3] [Federation] Unit tests updated for wait for control plane pods in kubefed init --- federation/pkg/kubefed/init/init_test.go | 38 +++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/federation/pkg/kubefed/init/init_test.go b/federation/pkg/kubefed/init/init_test.go index 4a8b218c88397..dd8747e99ed9e 100644 --- a/federation/pkg/kubefed/init/init_test.go +++ b/federation/pkg/kubefed/init/init_test.go @@ -168,7 +168,7 @@ func TestInitFederation(t *testing.T) { want = fmt.Sprintf("Federation control plane runs (dry run)\n") } - if got := buf.String(); got != want { + if got := buf.String(); !strings.Contains(got, want) { t.Errorf("[%d] unexpected output: got: %s, want: %s", i, got, want) if cmdErrMsg != "" { t.Errorf("[%d] unexpected error message: %s", i, cmdErrMsg) @@ -693,6 +693,38 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, }, } + podList := v1.PodList{} + apiServerPod := v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: testapi.Extensions.GroupVersion().String(), + }, + ObjectMeta: v1.ObjectMeta{ + Name: svcName, + Namespace: namespaceName, + }, + Status: v1.PodStatus{ + Phase: "Running", + }, + } + + ctrlMgrPod := v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: testapi.Extensions.GroupVersion().String(), + }, + ObjectMeta: v1.ObjectMeta{ + Name: cmName, + Namespace: namespaceName, + }, + Status: v1.PodStatus{ + Phase: "Running", + }, + } + + podList.Items = append(podList.Items, apiServerPod) + podList.Items = append(podList.Items, ctrlMgrPod) + f, tf, codec, _ := cmdtesting.NewAPIFactory() extCodec := testapi.Extensions.Codec() ns := dynamic.ContentConfig().NegotiatedSerializer @@ -794,7 +826,11 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, return nil, fmt.Errorf("Unexpected deployment object\n\tDiff: %s", diff.ObjectGoPrintDiff(got, want)) } return &http.Response{StatusCode: http.StatusCreated, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(extCodec, &want)}, nil + case p == "/api/v1/namespaces/federation-system/pods" && m == http.MethodGet: + return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(codec, &podList)}, nil + default: + fmt.Println("Unknon api called %v\n", p) return nil, fmt.Errorf("unexpected request: %#v\n%#v", req.URL, req) } }), From d6cfd826a3a86431408bf1cfb40d6a477c24b872 Mon Sep 17 00:00:00 2001 From: Irfan Ur Rehman Date: Tue, 27 Dec 2016 22:23:46 +0530 Subject: [PATCH 3/3] [Federation] Review comment fixes for wait for control plane pods in kubefed init --- federation/pkg/kubefed/init/init.go | 70 +++++++++++++++-------- federation/pkg/kubefed/init/init_test.go | 10 ++-- federation/pkg/kubefed/testing/BUILD | 1 + federation/pkg/kubefed/testing/testing.go | 13 +++++ federation/pkg/kubefed/util/BUILD | 1 + federation/pkg/kubefed/util/util.go | 31 +++++++--- 6 files changed, 89 insertions(+), 37 deletions(-) diff --git a/federation/pkg/kubefed/init/init.go b/federation/pkg/kubefed/init/init.go index 423f7115027ec..a9dc591cfbc92 100644 --- a/federation/pkg/kubefed/init/init.go +++ b/federation/pkg/kubefed/init/init.go @@ -63,6 +63,7 @@ const ( HostClusterLocalDNSZoneName = "cluster.local." lbAddrRetryInterval = 5 * time.Second + podWaitInterval = 2 * time.Second ) var ( @@ -230,22 +231,14 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman } if !dryRun { - fmt.Fprintf(cmdOut, "Waiting for control plane to come up") - for !podRunning(hostClientset, serverName, initFlags.FederationSystemNamespace) { - _, err := fmt.Fprintf(cmdOut, ".") - if err != nil { - return err - } - //wait indefinite if the pod doesn't show up with correct status - time.Sleep(2 * time.Second) + fedPods := []string{serverName, cmName} + err = waitForPods(hostClientset, fedPods, initFlags.FederationSystemNamespace) + if err != nil { + return err } - for !podRunning(hostClientset, cmName, initFlags.FederationSystemNamespace) { - _, err := fmt.Fprintf(cmdOut, ".") - if err != nil { - return err - } - //wait indefinite if the pod doesn't show up with correct status - time.Sleep(2 * time.Second) + err = waitSrvHealthy(config, initFlags.Name, initFlags.Kubeconfig) + if err != nil { + return err } return printSuccess(cmdOut, ips, hostnames) } @@ -593,19 +586,46 @@ func createControllerManager(clientset *client.Clientset, namespace, name, svcNa return clientset.Extensions().Deployments(namespace).Create(dep) } -func podRunning(clientset *client.Clientset, name, nameSpace string) bool { - podList, err := clientset.Core().Pods(nameSpace).List(api.ListOptions{}) +func waitForPods(clientset *client.Clientset, fedPods []string, namespace string) error { + err := wait.PollInfinite(podWaitInterval, func() (bool, error) { + podCheck := len(fedPods) + podList, err := clientset.Core().Pods(namespace).List(api.ListOptions{}) + if err != nil { + return false, nil + } + for _, pod := range podList.Items { + for _, fedPod := range fedPods { + if strings.HasPrefix(pod.Name, fedPod) && pod.Status.Phase == "Running" { + podCheck -= 1 + } + } + //ensure that all pods are in running state or keep waiting + if podCheck == 0 { + return true, nil + } + } + return false, nil + }) + return err +} + +func waitSrvHealthy(config util.AdminConfig, context, kubeconfig string) error { + fedClientSet, err := config.FederationClientset(context, kubeconfig) if err != nil { - //Problem in getting pods at this time - return false + return err } - - for _, pod := range podList.Items { - if strings.Contains(pod.Name, name) && pod.Status.Phase == "Running" { - return true + fedDiscoveryClient := fedClientSet.Discovery() + err = wait.PollInfinite(podWaitInterval, func() (bool, error) { + body, err := fedDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw() + if err != nil { + return false, nil } - } - return false + if strings.EqualFold(string(body), "ok") { + return true, nil + } + return false, nil + }) + return err } func printSuccess(cmdOut io.Writer, ips, hostnames []string) error { diff --git a/federation/pkg/kubefed/init/init_test.go b/federation/pkg/kubefed/init/init_test.go index dd8747e99ed9e..8aadf0d9e09f7 100644 --- a/federation/pkg/kubefed/init/init_test.go +++ b/federation/pkg/kubefed/init/init_test.go @@ -168,7 +168,7 @@ func TestInitFederation(t *testing.T) { want = fmt.Sprintf("Federation control plane runs (dry run)\n") } - if got := buf.String(); !strings.Contains(got, want) { + if got := buf.String(); got != want { t.Errorf("[%d] unexpected output: got: %s, want: %s", i, got, want) if cmdErrMsg != "" { t.Errorf("[%d] unexpected error message: %s", i, cmdErrMsg) @@ -708,7 +708,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, }, } - ctrlMgrPod := v1.Pod{ + cmPod := v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: testapi.Extensions.GroupVersion().String(), @@ -723,7 +723,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, } podList.Items = append(podList.Items, apiServerPod) - podList.Items = append(podList.Items, ctrlMgrPod) + podList.Items = append(podList.Items, cmPod) f, tf, codec, _ := cmdtesting.NewAPIFactory() extCodec := testapi.Extensions.Codec() @@ -733,6 +733,8 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m := req.URL.Path, req.Method; { + case p == "/healthz": + return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte("ok")))}, nil case p == "/api/v1/namespaces" && m == http.MethodPost: body, err := ioutil.ReadAll(req.Body) if err != nil { @@ -828,9 +830,7 @@ func fakeInitHostFactory(federationName, namespaceName, ip, dnsZoneName, image, return &http.Response{StatusCode: http.StatusCreated, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(extCodec, &want)}, nil case p == "/api/v1/namespaces/federation-system/pods" && m == http.MethodGet: return &http.Response{StatusCode: http.StatusOK, Header: kubefedtesting.DefaultHeader(), Body: kubefedtesting.ObjBody(codec, &podList)}, nil - default: - fmt.Println("Unknon api called %v\n", p) return nil, fmt.Errorf("unexpected request: %#v\n%#v", req.URL, req) } }), diff --git a/federation/pkg/kubefed/testing/BUILD b/federation/pkg/kubefed/testing/BUILD index f57ae5d942585..4b6a3281a39bc 100644 --- a/federation/pkg/kubefed/testing/BUILD +++ b/federation/pkg/kubefed/testing/BUILD @@ -12,6 +12,7 @@ go_library( srcs = ["testing.go"], tags = ["automanaged"], deps = [ + "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/pkg/kubefed/util:go_default_library", "//pkg/api:go_default_library", "//pkg/apimachinery/registered:go_default_library", diff --git a/federation/pkg/kubefed/testing/testing.go b/federation/pkg/kubefed/testing/testing.go index befcdebfbddf7..a763f47bab2d3 100644 --- a/federation/pkg/kubefed/testing/testing.go +++ b/federation/pkg/kubefed/testing/testing.go @@ -23,6 +23,7 @@ import ( "net/http" "os" + fedclient "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/kubefed/util" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apimachinery/registered" @@ -53,6 +54,18 @@ func (f *fakeAdminConfig) PathOptions() *clientcmd.PathOptions { return f.pathOptions } +func (f *fakeAdminConfig) FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error) { + fakeRestClient, err := f.hostFactory.RESTClient() + if err != nil { + return nil, err + } + + // we ignore the function params and use the client from + // the same fakefactory to create a federation clientset + // our fake factory exposes only the healthz api for this client + return fedclient.New(fakeRestClient), nil +} + func (f *fakeAdminConfig) HostFactory(host, kubeconfigPath string) cmdutil.Factory { return f.hostFactory } diff --git a/federation/pkg/kubefed/util/BUILD b/federation/pkg/kubefed/util/BUILD index c8b78570e9cfc..648da264ac7cb 100644 --- a/federation/pkg/kubefed/util/BUILD +++ b/federation/pkg/kubefed/util/BUILD @@ -12,6 +12,7 @@ go_library( srcs = ["util.go"], tags = ["automanaged"], deps = [ + "//federation/client/clientset_generated/federation_clientset:go_default_library", "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", diff --git a/federation/pkg/kubefed/util/util.go b/federation/pkg/kubefed/util/util.go index c6dd285de49e5..358223df6fdcf 100644 --- a/federation/pkg/kubefed/util/util.go +++ b/federation/pkg/kubefed/util/util.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + fedclient "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" @@ -39,13 +40,16 @@ const ( // AdminConfig provides a filesystem based kubeconfig (via // `PathOptions()`) and a mechanism to talk to the federation -// host cluster. +// host cluster and the federation control plane api server. type AdminConfig interface { // PathOptions provides filesystem based kubeconfig access. PathOptions() *clientcmd.PathOptions + // FedClientSet provides a federation API compliant clientset + // to communicate with the federation control plane api server + FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error) // HostFactory provides a mechanism to communicate with the // cluster where federation control plane is hosted. - HostFactory(host, kubeconfigPath string) cmdutil.Factory + HostFactory(hostcontext, kubeconfigPath string) cmdutil.Factory } // adminConfig implements the AdminConfig interface. @@ -64,17 +68,30 @@ func (a *adminConfig) PathOptions() *clientcmd.PathOptions { return a.pathOptions } -func (a *adminConfig) HostFactory(host, kubeconfigPath string) cmdutil.Factory { +func (a *adminConfig) FederationClientset(context, kubeconfigPath string) (*fedclient.Clientset, error) { + fedConfig := a.getClientConfig(context, kubeconfigPath) + fedClientConfig, err := fedConfig.ClientConfig() + if err != nil { + return nil, err + } + + return fedclient.NewForConfigOrDie(fedClientConfig), nil +} + +func (a *adminConfig) HostFactory(hostcontext, kubeconfigPath string) cmdutil.Factory { + hostClientConfig := a.getClientConfig(hostcontext, kubeconfigPath) + return cmdutil.NewFactory(hostClientConfig) +} + +func (a *adminConfig) getClientConfig(context, kubeconfigPath string) clientcmd.ClientConfig { loadingRules := *a.pathOptions.LoadingRules loadingRules.Precedence = a.pathOptions.GetLoadingPrecedence() loadingRules.ExplicitPath = kubeconfigPath overrides := &clientcmd.ConfigOverrides{ - CurrentContext: host, + CurrentContext: context, } - hostClientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&loadingRules, overrides) - - return cmdutil.NewFactory(hostClientConfig) + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&loadingRules, overrides) } // SubcommandFlags holds the flags required by the subcommands of