From 2545e22006239b5cc1343d9b5b629be9c54340c9 Mon Sep 17 00:00:00 2001 From: jacksontong Date: Fri, 5 Apr 2024 14:55:50 +0800 Subject: [PATCH] add upgradejob controller --- cmd/tke-application-controller/app/context.go | 8 + .../app/controller.go | 1 + .../app/upgradejob.go | 46 ++ go.mod | 25 +- go.sum | 39 +- .../controller/upgradejob/healthcheckpro.go | 91 +++ .../upgradejob/upgrade_job_controller.go | 604 ++++++++++++++++++ 7 files changed, 786 insertions(+), 28 deletions(-) create mode 100644 cmd/tke-application-controller/app/upgradejob.go create mode 100644 pkg/application/controller/upgradejob/healthcheckpro.go create mode 100644 pkg/application/controller/upgradejob/upgrade_job_controller.go diff --git a/cmd/tke-application-controller/app/context.go b/cmd/tke-application-controller/app/context.go index 5f7a8c2f3..5c5b8ce5d 100644 --- a/cmd/tke-application-controller/app/context.go +++ b/cmd/tke-application-controller/app/context.go @@ -21,6 +21,7 @@ package app import ( "fmt" "net/http" + "os" "time" "k8s.io/apimachinery/pkg/runtime/schema" @@ -74,6 +75,7 @@ type ControllerContext struct { ResyncPeriod func() time.Duration ControllerStartInterval time.Duration + Region string Repo appconfig.RepoConfiguration PlatformClient platformv1.PlatformV1Interface } @@ -114,6 +116,11 @@ func CreateControllerContext(cfg *config.Config, rootClientBuilder controller.Cl return ControllerContext{}, err } + region := os.Getenv("REGION") + if region == "" { + return ControllerContext{}, fmt.Errorf("Please set env REGION") + } + ctx := ControllerContext{ ClientBuilder: rootClientBuilder, InformerFactory: sharedInformers, @@ -125,6 +132,7 @@ func CreateControllerContext(cfg *config.Config, rootClientBuilder controller.Cl ResyncPeriod: controller.ResyncPeriod(&cfg.Component), ControllerStartInterval: cfg.Component.ControllerStartInterval, + Region: region, Repo: cfg.RepoConfiguration, PlatformClient: platformClient.PlatformV1(), } diff --git a/cmd/tke-application-controller/app/controller.go b/cmd/tke-application-controller/app/controller.go index 3ae618790..adfcb2158 100644 --- a/cmd/tke-application-controller/app/controller.go +++ b/cmd/tke-application-controller/app/controller.go @@ -49,6 +49,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["app"] = startAppController + controllers["upgradejob"] = startUpgradeJobController return controllers } diff --git a/cmd/tke-application-controller/app/upgradejob.go b/cmd/tke-application-controller/app/upgradejob.go new file mode 100644 index 000000000..ad6c671e2 --- /dev/null +++ b/cmd/tke-application-controller/app/upgradejob.go @@ -0,0 +1,46 @@ +/* + * Tencent is pleased to support the open source community by making TKEStack + * available. + * + * Copyright (C) 2012-2019 Tencent. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://opensource.org/licenses/Apache-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package app + +import ( + "net/http" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + + applicationv1 "tkestack.io/tke/api/application/v1" + "tkestack.io/tke/pkg/application/controller/upgradejob" +) + +func startUpgradeJobController(ctx ControllerContext) (http.Handler, bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: applicationv1.GroupName, Version: "v1", Resource: "apps"}] { + return nil, false, nil + } + + ctrl := upgradejob.NewController( + ctx.ClientBuilder.ClientOrDie("app-upgradejob-controller"), + ctx.PlatformClient, + ctx.InformerFactory.Application().V1().UpgradeJobs(), + time.Second*10, //ctx.Config.AppControllerConfiguration.SyncPeriod, + ctx.Region, + ) + + go ctrl.Run(ctx.Config.AppControllerConfiguration.ConcurrentSyncs, ctx.Stop) + return nil, true, nil +} diff --git a/go.mod b/go.mod index 7d496a6b2..1993df044 100644 --- a/go.mod +++ b/go.mod @@ -90,11 +90,11 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.1 go.uber.org/zap v1.19.0 - golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd - golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd + golang.org/x/crypto v0.19.0 + golang.org/x/net v0.21.0 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 - google.golang.org/grpc v1.43.0 + google.golang.org/grpc v1.62.1 gopkg.in/go-playground/validator.v9 v9.29.1 gopkg.in/ldap.v2 v2.5.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 @@ -124,7 +124,10 @@ require ( yunion.io/x/pkg v0.0.0-20200603123312-ad58e621aec0 ) -require golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 +require ( + git.woa.com/kmetis/healthcheckpro v0.0.0-20240325053646-df0498237ffd + golang.org/x/mod v0.8.0 +) require ( cloud.google.com/go v0.99.0 // indirect @@ -189,7 +192,7 @@ require ( github.com/go-kit/kit v0.10.0 // indirect github.com/go-ldap/ldap/v3 v3.3.0 // indirect github.com/go-logfmt/logfmt v0.5.0 // indirect - github.com/go-logr/logr v1.2.2 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect @@ -201,7 +204,7 @@ require ( github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.6 // indirect @@ -292,14 +295,14 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/term v0.17.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/api v0.61.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368 // indirect - google.golang.org/protobuf v1.27.1 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -308,7 +311,7 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect istio.io/gogo-genproto v0.0.0-20190930162913-45029607206a // indirect k8s.io/helm v2.16.12+incompatible // indirect - k8s.io/klog/v2 v2.60.1 // indirect + k8s.io/klog/v2 v2.100.1 // indirect oras.land/oras-go v1.1.0 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30 // indirect sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect diff --git a/go.sum b/go.sum index eeb906789..9ea2b0a3c 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ contrib.go.opencensus.io/exporter/ocagent v0.4.12/go.mod h1:450APlNTSR6FrvC3CTRq contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= entgo.io/ent v0.8.0/go.mod h1:KNjsukat/NJi6zJh1utwRadsbGOZsBbAZNDxkW7tMCc= +git.woa.com/kmetis/healthcheckpro v0.0.0-20240325053646-df0498237ffd h1:cp4ykGKjqfxh1WTaGXBDvpApHHyD8z7pVJFs4ylasi0= +git.woa.com/kmetis/healthcheckpro v0.0.0-20240325053646-df0498237ffd/go.mod h1:f4rnD1ONTUSqP/fhLkyx0q86ZFUQztMRDI8cLc3AUfo= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= github.com/AlekSi/pointer v1.1.0 h1:SSDMPcXD9jSl8FPy9cRzoRaMJtm9g9ggGTxecRUbQoI= github.com/AlekSi/pointer v1.1.0/go.mod h1:y7BvfRI3wXPWKXEBhU71nbnIEEZX0QTSB2Bj48UJIZE= @@ -137,7 +139,6 @@ github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3Q github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= -github.com/Masterminds/sprig v2.22.0+incompatible h1:z4yfnGrZ7netVz+0EDJ0Wi+5VZCSYp4Z0m2dk6cEM60= github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/Masterminds/sprig/v3 v3.2.2 h1:17jRggJu518dr3QaafizSXOjKYp94wKfABxUmyxvxX8= github.com/Masterminds/sprig/v3 v3.2.2/go.mod h1:UoaO7Yp8KlPnJIYWTFkMaqPUYKTfGFPhxNuwnnxkKlk= @@ -258,7 +259,6 @@ github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edY github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= -github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= @@ -297,7 +297,6 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -462,7 +461,6 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= @@ -649,8 +647,9 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v0.1.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk= github.com/go-logr/zapr v1.2.0 h1:n4JnPI1T3Qq1SFEi/F8rwLrZERp2bso19PJZDB9dayk= @@ -808,8 +807,9 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -877,7 +877,6 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww= github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -1590,7 +1589,6 @@ github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= -github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= @@ -1782,8 +1780,9 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1821,8 +1820,9 @@ golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hM golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180611182652-db08ff08e862/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1900,8 +1900,9 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220107192237-5cfca573fb4d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2058,13 +2059,15 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2075,8 +2078,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2310,8 +2314,9 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d h1:TxyelI5cVkbREznMhfzycHdkp5cLA7DpE+GKjSslYhM= @@ -2457,8 +2462,9 @@ k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/klog/v2 v2.9.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/klog/v2 v2.60.1 h1:VW25q3bZx9uE3vvdL6M8ezOX79vA2Aq1nEWLqNQclHc= k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= +k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-aggregator v0.22.3 h1:a+QucblaDv4zLgDE0ttLEUSk/SO/6xi3XZESmFri/D0= k8s.io/kube-aggregator v0.22.3/go.mod h1:TIpLq1HvR/S4y75i3y+4q9ik3ZvgyaDz72CBfDS0A6E= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= @@ -2518,7 +2524,6 @@ sigs.k8s.io/kustomize/kyaml v0.13.6 h1:eF+wsn4J7GOAXlvajv6OknSunxpcOBQQqsnPxObtk sigs.k8s.io/kustomize/kyaml v0.13.6/go.mod h1:yHP031rn1QX1lr/Xd934Ri/xdVNG8BE2ECa78Ht/kEg= sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA= -sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06 h1:zD2IemQ4LmOcAumeiyDWXKUI2SO0NYDe3H6QGvPOVgU= sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06/go.mod h1:/ULNhyfzRopfcjskuui0cTITekDduZ7ycKN3oUT9R18= sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= sigs.k8s.io/structured-merge-diff/v3 v3.0.0/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= diff --git a/pkg/application/controller/upgradejob/healthcheckpro.go b/pkg/application/controller/upgradejob/healthcheckpro.go new file mode 100644 index 000000000..f1aae483b --- /dev/null +++ b/pkg/application/controller/upgradejob/healthcheckpro.go @@ -0,0 +1,91 @@ +package upgradejob + +import ( + "context" + "fmt" + "os" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "tkestack.io/tke/pkg/util/log" + + "git.woa.com/kmetis/healthcheckpro/pb" +) + +var components = []string{"apiserver", "etcd"} + +func initHealthCheckerClient() pb.HealthCheckerClient { + hcAddr := os.Getenv("HEALTH_CHECKER_ADDR") + if hcAddr != "" { + conn, err := grpc.Dial(hcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + return pb.NewHealthCheckerClient(conn) + } + return nil +} + +func checkHealth(client pb.HealthCheckerClient, region, cls, app string, nodes []string) error { + if client == nil { + // skip check + return nil + } + + if err := checkClusterHealth(client, region, cls); err != nil { + return err + } + + if err := checkClusterAppHealth(client, region, cls, app); err != nil { + return err + } + + /* // TODO: 等联调完成后再放开 + if err := checkClusterNodes(client, region, cls, nodes); err != nil { + return err + } + */ + return nil +} + +func checkClusterHealth(client pb.HealthCheckerClient, region, cls string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // check basic components + for _, component := range components { + req := pb.ComponentHealthyRequest{Product: "tke", Region: region, ClusterId: cls, ComponentName: component} + if rsp, err := client.IsComponentHealthy(ctx, &req); err != nil { + return err + } else { + if err := rsp.GetErrMessage(); err != "" { + return fmt.Errorf(err) + } + } + } + + return nil +} + +func checkClusterAppHealth(client pb.HealthCheckerClient, region, cls, app string) error { + return nil +} + +func checkClusterNodes(client pb.HealthCheckerClient, region, cls string, nodes []string) error { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for _, node := range nodes { + // TODO: region一定要的吗?,node 可以用nodeName吗,它不一定是ip,可以同时查多个节点吗? + req := pb.NodeHealthyRequest{Product: "tke", Region: region, ClusterId: cls, NodeIp: node} + if rsp, err := client.IsNodeHealthy(ctx, &req); err != nil { + return err + } else { + if err := rsp.GetErrMessage(); err != "" { + return fmt.Errorf(err) + } + } + } + return nil +} diff --git a/pkg/application/controller/upgradejob/upgrade_job_controller.go b/pkg/application/controller/upgradejob/upgrade_job_controller.go new file mode 100644 index 000000000..317680cd4 --- /dev/null +++ b/pkg/application/controller/upgradejob/upgrade_job_controller.go @@ -0,0 +1,604 @@ +/* + * Tencent is pleased to support the open source community by making TKEStack + * available. + * + * Copyright (C) 2012-2019 Tencent. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://opensource.org/licenses/Apache-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package upgradejob + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "git.woa.com/kmetis/healthcheckpro/pb" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + applicationv1 "tkestack.io/tke/api/application/v1" + clientset "tkestack.io/tke/api/client/clientset/versioned" + platformversionedclient "tkestack.io/tke/api/client/clientset/versioned/typed/platform/v1" + applicationv1informer "tkestack.io/tke/api/client/informers/externalversions/application/v1" + applicationv1lister "tkestack.io/tke/api/client/listers/application/v1" + platformv1 "tkestack.io/tke/api/platform/v1" + controllerutil "tkestack.io/tke/pkg/controller" + "tkestack.io/tke/pkg/platform/util/addon" + "tkestack.io/tke/pkg/util/log" + "tkestack.io/tke/pkg/util/metrics" +) + +const ( + controllerName = "upgradejob-controller" +) + +type clsuterInfo struct { + cluster *platformv1.Cluster + client *kubernetes.Clientset +} + +// Controller is responsible for performing actions dependent upon an app phase. +type Controller struct { + client clientset.Interface + platformClient platformversionedclient.PlatformV1Interface + queue workqueue.RateLimitingInterface + lister applicationv1lister.UpgradeJobLister + listerSynced cache.InformerSynced + healthcheckClient pb.HealthCheckerClient + clusters sync.Map + region string + stopCh <-chan struct{} +} + +// NewController creates a new Controller object. +func NewController( + client clientset.Interface, + platformClient platformversionedclient.PlatformV1Interface, + upgradeJobInformer applicationv1informer.UpgradeJobInformer, + resyncPeriod time.Duration, region string, +) *Controller { + // create the controller so we can inject the enqueue function + controller := &Controller{ + client: client, + region: region, + platformClient: platformClient, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), + } + + if client != nil && + client.ApplicationV1().RESTClient() != nil && + !reflect.ValueOf(client.ApplicationV1().RESTClient()).IsNil() && + client.ApplicationV1().RESTClient().GetRateLimiter() != nil { + _ = metrics.RegisterMetricAndTrackRateLimiterUsage("tke_upgradejob_controller", client.ApplicationV1().RESTClient().GetRateLimiter()) + } + + upgradeJobInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.FilteringResourceEventHandler{ + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueue, + UpdateFunc: func(oldObj, newObj interface{}) { + old, ok1 := oldObj.(*applicationv1.UpgradeJob) + cur, ok2 := newObj.(*applicationv1.UpgradeJob) + if ok1 && ok2 && controller.needsUpdate(old, cur) { + controller.enqueue(newObj) + } + }, + //DeleteFunc: controller.enqueue, + }, + FilterFunc: func(obj interface{}) bool { + up, ok := obj.(*applicationv1.UpgradeJob) + if !ok { + return false + } + if up.Status.BatchCompleteNum > up.Status.BatchOrder { + return false + } + return true + }, + }, + resyncPeriod, + ) + controller.lister = upgradeJobInformer.Lister() + controller.listerSynced = upgradeJobInformer.Informer().HasSynced + controller.healthcheckClient = initHealthCheckerClient() + + return controller +} + +// obj could be an *applicationv1.App, or a DeletionFinalStateUnknown marker item. +func (c *Controller) enqueue(obj interface{}) { + key, err := controllerutil.KeyFunc(obj) + if err != nil { + log.Error("Couldn't get key for object", log.Any("object", obj), log.Err(err)) + return + } + c.queue.Add(key) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. +func (c *Controller) Run(workers int, stopCh <-chan struct{}) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + + // Start the informer factories to begin populating the informer caches + log.Info("starting upgrade job controller") + defer log.Info("shutting down upgrade job controller") + + if ok := cache.WaitForCacheSync(stopCh, c.listerSynced); !ok { + log.Error("failed to wait for upgrade job caches to sync") + return + } + + c.stopCh = stopCh + for i := 0; i < workers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + + go c.garbageCollection() + + <-stopCh +} + +// worker processes the queue of app objects. +// Each app can be in the queue at most once. +// The system ensures that no two workers can process +// the same app at the same time. +func (c *Controller) worker() { + workFunc := func() bool { + key, quit := c.queue.Get() + if quit { + return true + } + defer c.queue.Done(key) + + err := c.syncItem(key.(string)) + if err == nil { + // no error, forget this entry and return + c.queue.Forget(key) + return false + } + + // rather than wait for a full resync, re-add the app to the queue to be processed + c.queue.AddRateLimited(key) + runtime.HandleError(err) + return false + } + + for { + quit := workFunc() + + if quit { + return + } + } +} + +// syncItem will sync the App with the given key if it has had +// its expectations fulfilled, meaning it did not expect to see any more of its +// applications created or deleted. This function is not meant to be invoked +// concurrently with the same key. +func (c *Controller) syncItem(key string) error { + startTime := time.Now() + defer func() { + if time.Since(startTime) > 3*time.Second { + log.Info("finished syncing upgradejob", log.String("upgradejob", key), log.Duration("processTime", time.Since(startTime))) + } + }() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + up, err := c.lister.UpgradeJobs(namespace).Get(name) + switch { + case errors.IsNotFound(err): + log.Info("upgradejob has been deleted. Attempting to cleanup resources", + log.String("namespace", namespace), + log.String("name", name)) + _ = c.processDeletion(key) + return nil + case err != nil: + log.Warn("unable to retrieve upgradejob from store", + log.String("namespace", namespace), + log.String("name", name), log.Err(err)) + return err + default: + return c.reconcileUpgradeJob(up) + } +} + +func (c *Controller) processDeletion(key string) error { + return nil +} + +func (c *Controller) needsUpdate(old *applicationv1.UpgradeJob, new *applicationv1.UpgradeJob) bool { + // TODO + return true +} + +func (c *Controller) reconcileUpgradeJob(up *applicationv1.UpgradeJob) error { + cluster, err := c.getCluster(up.Namespace) + if err != nil { + log.Errorf("reconcileUpgradeJob get cluster for upgradeJob %s/%s failed: %s", up.Namespace, up.Name, err.Error()) + return err + } + + if up.Spec.AppRefer != "" { + app, err := c.client.ApplicationV1().Apps(up.Namespace).Get(context.TODO(), up.Spec.AppRefer, metav1.GetOptions{}) + if err != nil { + log.Errorf("reconcileUpgradeJob get app for upgradejob %s/%s failed: %s", up.Namespace, up.Name, err.Error()) + return c.setUpgradeJobToCompleteStatus(up, "get app failed") + } + // TODO: app如果是安装在meta集群,先跳过处理,后续再看是否放开。同时需要注意,meta的client获取接口不同 + if app.Labels != nil && app.Labels["application.tkestack.io/meta"] == "true" { + return c.setUpgradeJobToCompleteStatus(up, "app is install in meta cluster") + } + } + + ns, daemonsetName, err := cache.SplitMetaNamespaceKey(up.Spec.Target) + if err != nil { + log.Errorf("reconcileUpgradeJob parse target for upgradejob %s/%s failed: %s", up.Namespace, up.Name, err.Error()) + return c.setUpgradeJobToCompleteStatus(up, "parse target failed") + } + if ns == "" { + ns = "default" + } + + // 1.获取daemonset + daemonSet, err := cluster.client.AppsV1().DaemonSets(ns).Get(context.TODO(), daemonsetName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return c.setUpgradeJobToCompleteStatus(up, "target daemonset not found") + } + log.Errorf("find daemonset %s failed: %v", daemonsetName, err) + return err + } else { + if daemonSet.Spec.UpdateStrategy.Type != appsv1.OnDeleteDaemonSetStrategyType { + log.Errorf("DaemonSet %s/%s updateStrategy uncorrect", daemonSet.Namespace, daemonSet.Name) + return c.setUpgradeJobToCompleteStatus(up, "target daemonset updateStrategy is uncorrect") + } + } + + pods, err := getPodsForDaemonSet(cluster.client, daemonSet) + if err != nil { + log.Errorf("getPodsForDaemonSet %s failed: %v", daemonsetName, err) + return err + } + + // 2.获取ds的rv信息 + hash, err := getCRHashForDaemonSet(cluster.client, daemonSet) + if err != nil { + log.Errorf("getCRHashForDaemonSet %s failed: %v", daemonsetName, err) + return err + } + + log.Infof("Get pods count %d, targetVersion %s", len(pods), hash) + + // TODO: 对于在删除中的pod,如果一直terminating,是否尝试强制删除? + if fc := getUnHealthPodsNum(pods); fc > int(*up.Spec.MaxFailed) { + log.Errorf("%s/%s unhealth pods count %d exceed %d for ds %s", up.Namespace, up.Name, fc, *up.Spec.MaxFailed, daemonsetName) + return fmt.Errorf("wait ds %s pod all ready for %s/%s", daemonsetName, up.Namespace, up.Name) + } + + // 3.判断是否已经升级完成,更新到complete状态 + if checkAllPodsUpdated(hash, pods) && + (up.Status.BatchCompleteNum >= *up.Spec.BatchNum || up.Status.BatchOrder == 0) { // 已经处理完了或者还未开始处理但其实已经升级完的场景,up资源在app升级之后创建 + log.Infof("Batch upgrade %s/%s has Completed for %s: clear status %v.", up.Namespace, up.Name, daemonsetName, up.Status) + return c.setUpgradeJobToCompleteStatus(up, "") + } + + // 4.新一批的处理:刚开始滚动或者上一批已完成 + if up.Status.BatchOrder == up.Status.BatchCompleteNum { + // 对于非第一批的批次,等BatchInterval再进入下一批 + if up.Status.BatchOrder != 0 { + intervalMinutes := time.Minute + if up.Spec.BatchIntervalSeconds != nil { + intervalMinutes = time.Duration(*up.Spec.BatchIntervalSeconds) * time.Second + } + if !up.Status.BatchCompleteTime.Add(intervalMinutes).Before(time.Now()) { + log.Infof("Batch upgrade %s/%s/%d wait to handle next batch: %v.", up.Namespace, up.Name, up.Status.BatchOrder, up.Status) + return nil + } + } + + batchOrder := up.Status.BatchOrder + 1 + updatePods := getUpdatePod(calculateBatchSize(len(pods), int(*up.Spec.BatchNum), int(batchOrder)), hash, pods) + publisherCopy := up.DeepCopy() + publisherCopy.Status.BatchOrder = batchOrder + publisherCopy.Status.BatchStartTime = metav1.Time{Time: time.Now()} + publisherCopy.Status.BatchCompleteTime = metav1.Time{} + publisherCopy.Status.BatchUpdatedNode = getNodesFromPods(updatePods) + + if _, err := c.client.ApplicationV1().UpgradeJobs(up.Namespace).Update(context.TODO(), publisherCopy, metav1.UpdateOptions{}); err != nil { + log.Errorf("Batch upgrade %s/%s/%d begin for next batch failed: %v %v", up.Namespace, up.Name, up.Status.BatchOrder, publisherCopy.Status, err) + return err + } + return nil + } + + // 5.当前batch的处理 + if checkPodsOnNodesUpdate(hash, pods, up.Status.BatchUpdatedNode) { + publisherCopy := up.DeepCopy() + publisherCopy.Status.BatchCompleteTime = metav1.Time{Time: time.Now()} + publisherCopy.Status.BatchCompleteNum = publisherCopy.Status.BatchCompleteNum + 1 + publisherCopy.Status.BatchCompleteNodes = publisherCopy.Status.BatchCompleteNodes + int32(len(up.Status.BatchUpdatedNode)) + + if _, err = c.client.ApplicationV1().UpgradeJobs(up.Namespace).Update(context.TODO(), publisherCopy, metav1.UpdateOptions{}); err != nil { + log.Errorf("Batch upgrade %s/%s/%d handle failed: %v", up.Namespace, up.Name, up.Status.BatchOrder, err) + return err + } + log.Infof("Batch upgrade %s/%s/%d handle completed for node %v", up.Namespace, up.Name, up.Status.BatchOrder, publisherCopy.Status.BatchUpdatedNode) + } else { + // healthcheck before action + if c.healthcheckClient != nil { + if err := checkHealth(c.healthcheckClient, c.region, up.Namespace, up.Spec.AppRefer, up.Status.BatchUpdatedNode); err != nil { + log.Errorf("%s/%s healthcheck failed: %v", up.Namespace, up.Name, err) + return err + } else { + log.Infof("%s/%s healthcheck success", up.Namespace, up.Name) + } + } + + order := up.Status.BatchOrder + log.Infof("Batch upgrade %s/%s/%d begin to handle: %v", up.Namespace, up.Name, order, up.Status) + ups := getPodsOnNodesToUpdate(hash, pods, up.Status.BatchUpdatedNode) + for i, pod := range ups { + if err := cluster.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { + log.Errorf("Batch upgrade %s/%s/%d: delete pod %s/%s on node %s faild: %v", up.Namespace, up.Name, order, pod.Namespace, pod.Name, pod.Spec.NodeName, err) + return err + } + log.Infof("Batch upgrade %s/%s/%d: delete pod %s/%s on node %s", up.Namespace, up.Name, order, pod.Namespace, pod.Name, pod.Spec.NodeName) + if int32(i+1) >= *up.Spec.MaxSurge { + break + } + } + } + + return nil +} + +func (c *Controller) setUpgradeJobToCompleteStatus(up *applicationv1.UpgradeJob, failed string) error { + // clear cluster cache + c.deleteCluster(up.Namespace) + + upCopy := up.DeepCopy() + upCopy.Status.BatchOrder = 0 + if upCopy.Status.BatchCompleteNum == 0 { + upCopy.Status.BatchCompleteNum = 1 + } + upCopy.Status.BatchUpdatedNode = nil + upCopy.Status.BatchStartTime = metav1.Time{} + upCopy.Status.BatchCompleteTime = metav1.Time{} + if failed != "" { + upCopy.Status.Reason = &failed + } + + if _, err := c.client.ApplicationV1().UpgradeJobs(up.Namespace).Update(context.TODO(), upCopy, metav1.UpdateOptions{}); err != nil { + log.Errorf("Batch upgrade %s/%s/%d handle failed: %v", up.Namespace, up.Name, up.Status.BatchOrder, err) + return err + } + return nil +} + +func (c *Controller) getCluster(cls string) (*clsuterInfo, error) { + if info, ok := c.clusters.Load(cls); ok { + return info.(*clsuterInfo), nil + } + + cluster, err := c.platformClient.Clusters().Get(context.TODO(), cls, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + client, err := addon.BuildExternalClientSet(context.TODO(), cluster, c.platformClient) + if err != nil { + return nil, err + } + + info := &clsuterInfo{ + cluster: cluster, + client: client, + } + c.clusters.Store(cls, info) + return info, nil +} + +func (c *Controller) deleteCluster(cls string) { + c.clusters.Delete(cls) +} + +func (c *Controller) garbageCollection() { + ticker := time.Tick(2 * time.Hour) + for range ticker { + func() { + log.Infof("garbageCollection begin") + defer log.Infof("garbageCollection end") + + ups, err := c.lister.List(labels.Everything()) + if err != nil { + log.Errorf("garbageCollection list ups failed: %v", err) + return + } + + for _, up := range ups { + if up.Status.BatchCompleteNum > up.Status.BatchOrder && + up.CreationTimestamp.Add(24*time.Hour).Before(time.Now()) { + if err := c.client.ApplicationV1().UpgradeJobs(up.Namespace).Delete(context.TODO(), up.Name, metav1.DeleteOptions{}); err != nil { + log.Errorf("garbageCollection %s/%s %s failed: %v", up.Namespace, up.Name, up.Spec.Target, err) + } else { + log.Infof("garbageCollection %s/%s %s", up.Namespace, up.Name, up.Spec.Target) + } + } + } + }() + } +} + +func getPodsForDaemonSet(clientset kubernetes.Interface, daemonSet *appsv1.DaemonSet) ([]corev1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(daemonSet.Spec.Selector) + if err != nil { + return nil, err + } + + pods, err := clientset.CoreV1().Pods(daemonSet.Namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: selector.String(), ResourceVersion: "0", + }) + if err != nil { + return nil, err + } + + var filteredPods []corev1.Pod + for _, pod := range pods.Items { + for _, ownerReference := range pod.OwnerReferences { + if ownerReference.UID == daemonSet.UID { + filteredPods = append(filteredPods, pod) + } + } + } + + return filteredPods, nil +} + +func getCRHashForDaemonSet(clientset kubernetes.Interface, daemonSet *appsv1.DaemonSet) (string, error) { + selector, err := metav1.LabelSelectorAsSelector(daemonSet.Spec.Selector) + if err != nil { + return "", err + } + + controllerRevisions, err := clientset.AppsV1().ControllerRevisions(daemonSet.Namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: selector.String(), ResourceVersion: "0", + }) + if err != nil { + return "", err + } + + var max int + for index, cr := range controllerRevisions.Items { + if cr.Revision > controllerRevisions.Items[max].Revision { + max = index + } + } + + return controllerRevisions.Items[max].Labels[appsv1.DefaultDaemonSetUniqueLabelKey], nil +} + +// TODO: 升级过程中node发生变化,目前会重新计算,不会有影响 +func calculateBatchSize(total int, batchNumber int, batchOrder int) int { + batchSize := total / batchNumber + + remainder := total % batchNumber + + if batchOrder <= remainder { + return batchSize + 1 + } + + return batchSize +} + +func checkPodsOnNodesUpdate(targetVersion string, pods []corev1.Pod, nodes []string) bool { + check := make(map[string]bool, len(pods)) + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + // skip terminating pod + continue + } + + if pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] == targetVersion { + check[pod.Spec.NodeName] = true + } + } + + for _, node := range nodes { + if !check[node] { + return false + } + } + return true +} + +func getPodsOnNodesToUpdate(targetVersion string, pods []corev1.Pod, nodes []string) []corev1.Pod { + var ups []corev1.Pod + for _, pod := range pods { + if pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] != targetVersion && pod.DeletionTimestamp == nil { + for _, node := range nodes { + if pod.Spec.NodeName == node { + ups = append(ups, pod) + break + } + } + } + } + return ups +} + +func getUnHealthPodsNum(pods []corev1.Pod) int { + count := 0 + for _, pod := range pods { + if pod.Status.Phase != corev1.PodRunning { + count++ + } + } + return count +} + +func checkAllPodsUpdated(targetVersion string, pods []corev1.Pod) bool { + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + // skip terminating pod + continue + } + + if pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] != targetVersion { + return false + } + } + return true +} + +func getNodesFromPods(pods []corev1.Pod) []string { + nodes := make([]string, len(pods)) + for i, pod := range pods { + nodes[i] = pod.Spec.NodeName + } + return nodes +} + +func getUpdatePod(num int, targetVersion string, pods []corev1.Pod) []corev1.Pod { + var p []corev1.Pod + + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + // skip terminating pod + continue + } + if len(p) < num && pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] != targetVersion { + p = append(p, pod) + } + } + + return p +}