Skip to content

Commit

Permalink
improve the processing logic for ds upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksontong committed Apr 5, 2024
1 parent 0960705 commit 87a4e03
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 126 deletions.
196 changes: 196 additions & 0 deletions pkg/application/controller/app/action/daemonset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Tencent is pleased to support the open source community by making TKEStack
* available.
*
* Copyright (C) 2012-2019 Tencent. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://opensource.org/licenses/Apache-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package action

import (
"context"
"fmt"
"strconv"
"strings"

"helm.sh/helm/v3/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

applicationv1 "tkestack.io/tke/api/application/v1"
applicationversionedclient "tkestack.io/tke/api/client/clientset/versioned/typed/application/v1"
platformversionedclient "tkestack.io/tke/api/client/clientset/versioned/typed/platform/v1"
appconfig "tkestack.io/tke/pkg/application/config"
applicationprovider "tkestack.io/tke/pkg/application/provider/application"
helmchart "tkestack.io/tke/pkg/helm"
"tkestack.io/tke/pkg/util/log"
)

func UpgradeDaemonset(ctx context.Context,
applicationClient applicationversionedclient.ApplicationV1Interface,
platformClient platformversionedclient.PlatformV1Interface,
app *applicationv1.App,
repo appconfig.RepoConfiguration,
updateStatusFunc applicationprovider.UpdateStatusFunc) (*applicationv1.App, error) {

daemonsets, err := getOndeleteDaemonsets(ctx, platformClient, app, repo)
if err != nil {
log.Errorf("call needCreateUpgradeJob for app %s/%s failed: %v", app.Namespace, app.Name, err)
return nil, err
}

// 无须进行ds灰度升级,则再次进入AppPhaseUpgrading状态
if len(daemonsets) == 0 {
newStatus := app.Status.DeepCopy()
newStatus.Phase = applicationv1.AppPhaseUpgrading
newStatus.Message = "ds skip"
newStatus.Reason = ""
newStatus.LastTransitionTime = metav1.Now()
return updateStatusFunc(ctx, app, &app.Status, newStatus)
}

allDone := true
for _, ds := range daemonsets {
jobName := getUpgradeJobNameFromApp(ds, app)
uj, err := applicationClient.UpgradeJobs(app.Spec.TargetCluster).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
// 不改变状态,用于后续的重试
if !apierrors.IsNotFound(err) {
log.Errorf("get UpgradeJobs %s for app %s/%s failed: %v", jobName, app.Namespace, app.Name, err)
return nil, err
}

// create job
upt, err := applicationClient.UpgradePolicyTemplates().Get(context.TODO(), app.Spec.UpgradePolicy, metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
log.Errorf("get UpgradePolicyTemplates %s for app %s/%s failed: %v", app.Spec.UpgradePolicy, app.Namespace, app.Name, err)
return nil, err
}
up := buildUpgradeJobFromTemplate(jobName, app, upt)
uj, err = applicationClient.UpgradeJobs(app.Spec.TargetCluster).Create(context.TODO(), up, metav1.CreateOptions{})
if err != nil {
log.Errorf("create UpgradeJobs %s for app %s/%s failed: %v", jobName, app.Namespace, app.Name, err)
return nil, err
}
}

if uj.Status.BatchCompleteNum <= uj.Status.BatchOrder {
allDone = false
}
}

if allDone {
// 切换到upgrading阶段,继续走后续的post流程
if updateStatusFunc != nil {
newStatus := app.Status.DeepCopy()
newStatus.Phase = applicationv1.AppPhaseUpgrading
newStatus.Message = "ds ok"
newStatus.Reason = ""
newStatus.LastTransitionTime = metav1.Now()
return updateStatusFunc(ctx, app, &app.Status, newStatus)
}
}

return app, nil
}

func getOndeleteDaemonsets(ctx context.Context,
platformClient platformversionedclient.PlatformV1Interface,
app *applicationv1.App,
repo appconfig.RepoConfiguration) ([]string, error) {

helper, err := helmchart.NewHelper(ctx, platformClient, app, repo)
if err != nil {
log.Errorf(fmt.Sprintf("new cluster %s app %s helper failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return nil, err
}
clusterManifest, err := helper.GetClusterManifest()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s cluster manifest failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return nil, err
}
manifest, err := helper.GetManifest()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s manifest failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return nil, err
}
if clusterManifest != manifest {
// 两次前后渲染不一致,重新触发检查
log.Errorf(fmt.Sprintf("cluster %s app %s, cluster manifest is not equal manifest", app.Spec.TargetCluster, app.Name))
return nil, err
}
clusterResource, err := helper.GetClusterResource()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s resource failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return nil, err
}

var ds []string
for _, v := range clusterResource {
switch kube.AsVersioned(v).(type) {
case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
daemonset := v.Object.(*appsv1.DaemonSet)
if daemonset.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
ds = append(ds, daemonset.Namespace+"/"+daemonset.Name)
}
case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
default:
}
}
return ds, nil
}

func buildUpgradeJobFromTemplate(ds string, app *applicationv1.App, tp *applicationv1.UpgradePolicyTemplate) *applicationv1.UpgradeJob {
batchNum := *tp.Spec.BatchNum
batchIntervalSeconds := *tp.Spec.BatchIntervalSeconds
maxFailed := *tp.Spec.MaxFailed
maxSurge := *tp.Spec.MaxSurge
up := &applicationv1.UpgradeJob{
TypeMeta: metav1.TypeMeta{
Kind: "UpgradeJob",
APIVersion: "application.tkestack.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: getUpgradeJobNameFromApp(ds, app),
Namespace: app.Spec.TargetCluster,
Labels: map[string]string{
"createdBy": app.Name,
"appGeneration": strconv.Itoa(int(app.Generation)),
},
},
Spec: applicationv1.UpgradeJobSpec{
TenantID: app.Spec.TenantID,
Target: ds,
AppRefer: app.Name,

BatchNum: &batchNum,
BatchIntervalSeconds: &batchIntervalSeconds,
MaxFailed: &maxFailed,
MaxSurge: &maxSurge,
},
}

return up
}

func getUpgradeJobNameFromApp(ds string, app *applicationv1.App) string {
// format of ds is like kube-system/kube-proxy, use ds's name
s := strings.Split(ds, "/")
return fmt.Sprintf("%s-%s-%d", app.Name, s[1], app.Generation)
}
143 changes: 17 additions & 126 deletions pkg/application/controller/app/action/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (
"context"
"errors"
"fmt"
"helm.sh/helm/v3/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"strconv"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

applicationv1 "tkestack.io/tke/api/application/v1"
applicationversionedclient "tkestack.io/tke/api/client/clientset/versioned/typed/application/v1"
platformversionedclient "tkestack.io/tke/api/client/clientset/versioned/typed/platform/v1"
Expand All @@ -38,7 +37,6 @@ import (
applicationprovider "tkestack.io/tke/pkg/application/provider/application"
"tkestack.io/tke/pkg/application/util"
chartpath "tkestack.io/tke/pkg/application/util/chartpath/v1"
helmchart "tkestack.io/tke/pkg/helm"
"tkestack.io/tke/pkg/util/log"
"tkestack.io/tke/pkg/util/metrics"
)
Expand Down Expand Up @@ -166,8 +164,13 @@ func Upgrade(ctx context.Context,
if app.Spec.UpgradePolicy != "" && (app.Status.Message != "ds skip" && app.Status.Message != "ds ok") {
// 查看是否有对应的job
// cls-id + app.Generation
jobName := fmt.Sprintf("%s-%d", app.Name, app.Generation)
_, err := applicationClient.UpgradeJobs(app.Spec.TargetCluster).Get(context.TODO(), jobName, metav1.GetOptions{})
// TODO: 考虑多个daemonset场景(selected by label)
ujs, err := applicationClient.UpgradeJobs(app.Spec.TargetCluster).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"createdBy": app.Name,
"appGeneration": strconv.Itoa(int(app.Generation)),
}).String(),
})
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
Expand All @@ -183,8 +186,12 @@ func Upgrade(ctx context.Context,
return updateStatusFunc(ctx, app, &app.Status, newStatus)
}
} else {
// TODO: 一定是ds升级后过来的,所以要求 job 一定是完成状态。这里做个检查,异常时打印日志

for _, uj := range ujs.Items {
if uj.Status.BatchCompleteNum <= uj.Status.BatchOrder {
// TODO: 一定是ds升级后过来的,所以要求 job 一定是完成状态。这里做个检查,异常时打印日志
log.Errorf("AppPhaseUpgradingDaemonset to AppPhaseUpgrading but job not OK: %s/%s", app.Namespace, app.Name)
}
}
}
}

Expand Down Expand Up @@ -235,119 +242,3 @@ func Upgrade(ctx context.Context,
}
return app, nil
}

func UpgradeDaemonset(ctx context.Context,
applicationClient applicationversionedclient.ApplicationV1Interface,
platformClient platformversionedclient.PlatformV1Interface,
app *applicationv1.App,
repo appconfig.RepoConfiguration,
updateStatusFunc applicationprovider.UpdateStatusFunc) (*applicationv1.App, error) {

if need, err := needCreateUpgradeJob(ctx, applicationClient, platformClient, app, repo, updateStatusFunc); err != nil {
log.Errorf("call needCreateUpgradeJob for app %s/%s failed: %v", app.Namespace, app.Name, err)
return nil, err
} else {
// 无须进行ds灰度升级,则再次进入AppPhaseUpgrading状态
if !need {
if updateStatusFunc != nil {
newStatus := app.Status.DeepCopy()
newStatus.Phase = applicationv1.AppPhaseUpgrading
newStatus.Message = "ds skip"
newStatus.Reason = ""
newStatus.LastTransitionTime = metav1.Now()
return updateStatusFunc(ctx, app, &app.Status, newStatus)
}
}
}

jobName := fmt.Sprintf("%s-%d", app.Name, app.Generation)
uj, err := applicationClient.UpgradeJobs(app.Spec.TargetCluster).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
// 不改变状态,用于后续的重试
if !apierrors.IsNotFound(err) {
return nil, err
}
// create job
//upt, err := applicationClient.UpgradePolicies(app.Spec.TargetCluster).Get(context.TODO(), jobName, metav1.GetOptions{})
}

if uj.Status.BatchCompleteNum <= uj.Status.BatchOrder {
// 升级未完成,继续等待。这里要等待多长时间,得是app的sync周期吗?controller修改app对象,触发升级?
return app, nil
} else {
// 切换到upgrading阶段,继续走后续的post流程
if updateStatusFunc != nil {
newStatus := app.Status.DeepCopy()
newStatus.Phase = applicationv1.AppPhaseUpgrading
newStatus.Message = "ds ok"
newStatus.Reason = ""
newStatus.LastTransitionTime = metav1.Now()
return updateStatusFunc(ctx, app, &app.Status, newStatus)
}
}

return app, nil
}

func createUpgradeJob() {}

func needCreateUpgradeJob(ctx context.Context,
applicationClient applicationversionedclient.ApplicationV1Interface,
platformClient platformversionedclient.PlatformV1Interface,
app *applicationv1.App,
repo appconfig.RepoConfiguration,
updateStatusFunc applicationprovider.UpdateStatusFunc) (bool, error) {

helper, err := helmchart.NewHelper(ctx, platformClient, app, repo)
if err != nil {
log.Errorf(fmt.Sprintf("new cluster %s app %s helper failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return false, err
}
clusterManifest, err := helper.GetClusterManifest()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s cluster manifest failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return false, err
}
manifest, err := helper.GetManifest()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s manifest failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return false, err
}
if clusterManifest != manifest {
// 两次前后渲染不一致,重新触发检查
log.Errorf(fmt.Sprintf("cluster %s app %s, cluster manifest is not equal manifest", app.Spec.TargetCluster, app.Name))
return false, err
}
clusterResource, err := helper.GetClusterResource()
if err != nil {
log.Errorf(fmt.Sprintf("get cluster %s app %s resource failed, err:%s", app.Spec.TargetCluster, app.Name, err.Error()))
return false, err
}
/*
cluster, err := platformClient.Clusters().Get(ctx, app.Spec.TargetCluster, metav1.GetOptions{})
if err != nil {
log.Errorf("platformClient get client failed:%s", err.Error())
return nil, err
}
k8sClient, err := addon.BuildExternalClientSet(ctx, cluster, platformClient)
if err != nil {
log.Errorf("addCrLabelAnnotation cos build cluster client failed: %s", err.Error())
return nil, err
}
*/
for _, v := range clusterResource {
switch kube.AsVersioned(v).(type) {
case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
ds := v.Object.(*appsv1.DaemonSet)
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
// create upgradejob
return true, nil
}

case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
default:
}
}
return false, nil
}

0 comments on commit 87a4e03

Please sign in to comment.