Skip to content

Commit

Permalink
config support gray (#1271)
Browse files Browse the repository at this point in the history
  • Loading branch information
skywli authored Nov 15, 2023
1 parent 3df1d28 commit 5f207e2
Show file tree
Hide file tree
Showing 30 changed files with 750 additions and 75 deletions.
4 changes: 0 additions & 4 deletions apiserver/httpserver/config/console_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,16 +291,12 @@ func (h *HTTPServer) PublishConfigFile(req *restful.Request, rsp *restful.Respon

configFile := &apiconfig.ConfigFileRelease{}
ctx, err := handler.Parse(configFile)
requestId := ctx.Value(utils.StringContext("request-id"))

if err != nil {
configLog.Error("[Config][HttpServer] parse config file release from request error.",
zap.String("requestId", requestId.(string)),
zap.String("error", err.Error()))
handler.WriteHeaderAndProto(api.NewConfigFileReleaseResponseWithMessage(apimodel.Code_ParseException, err.Error()))
return
}

handler.WriteHeaderAndProto(h.configServer.PublishConfigFile(ctx, configFile))
}

Expand Down
3 changes: 2 additions & 1 deletion apiserver/nacosserver/v1/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/config"
commonmodel "github.com/polarismesh/polaris/common/model"
)

func (n *ConfigServer) handlePublishConfig(ctx context.Context, req *model.ConfigFile) (bool, error) {
Expand Down Expand Up @@ -170,7 +171,7 @@ func (n *ConfigServer) diffChangeFiles(listenCtx *config_manage.ClientWatchConfi
dataId := item.GetFileName().GetValue()
mdval := item.GetMd5().GetValue()

active := n.cacheSvr.ConfigFile().GetActiveRelease(namespace, group, dataId)
active := n.cacheSvr.ConfigFile().GetActiveRelease(namespace, group, dataId, commonmodel.ReleaseTypeFull)
if (active == nil && mdval != "") || (active != nil && active.Md5 != mdval) {
changeKeys = append(changeKeys, &model.ConfigListenItem{
Tenant: model.ToNacosConfigNamespace(namespace),
Expand Down
3 changes: 2 additions & 1 deletion apiserver/nacosserver/v2/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/polarismesh/polaris/apiserver/nacosserver/v2/remote"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/config"
commonmodel "github.com/polarismesh/polaris/common/model"
)

const (
Expand Down Expand Up @@ -166,7 +167,7 @@ func (h *ConfigServer) handleWatchConfigRequest(ctx context.Context, req nacospb
dataId := item.GetFileName().GetValue()
mdval := item.GetMd5().GetValue()

active := h.cacheSvr.ConfigFile().GetActiveRelease(namespace, group, dataId)
active := h.cacheSvr.ConfigFile().GetActiveRelease(namespace, group, dataId, commonmodel.ReleaseTypeFull)
// 如果 client 过来的 MD5 是一个空字符串
if (active == nil && mdval != "") || (active != nil && active.Md5 != mdval) {
listenResp.ChangedConfigs = append(listenResp.ChangedConfigs, nacospb.ConfigContext{
Expand Down
17 changes: 15 additions & 2 deletions cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apisecurity "github.com/polarismesh/specification/source/go/api/v1/security"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"

"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
Expand Down Expand Up @@ -64,6 +65,8 @@ const (
StrategyRuleName = "strategyRule"
// ServiceContractName service contract config name
ServiceContractName = "serviceContract"
// GrayName gray config name
GrayName = "gray"
)

type CacheIndex int
Expand All @@ -85,6 +88,7 @@ const (
CacheFaultDetector
CacheConfigGroup
CacheServiceContract
CacheGray

CacheLast
)
Expand Down Expand Up @@ -403,6 +407,8 @@ type (
ReleaseName string
// OnlyActive
OnlyActive bool
// IncludeGray 是否包含灰度文件,默认不包括
IncludeGray bool
// Metadata
Metadata map[string]string
// NoPage
Expand Down Expand Up @@ -440,8 +446,7 @@ type (
Cache
// GetActiveRelease
GetGroupActiveReleases(namespace, group string) ([]*model.ConfigFileRelease, string)
// GetActiveRelease
GetActiveRelease(namespace, group, fileName string) *model.ConfigFileRelease
GetActiveRelease(namespace, group, fileName string, typ model.ReleaseType) *model.ConfigFileRelease
// GetRelease
GetRelease(key model.ConfigFileReleaseKey) *model.ConfigFileRelease
// QueryReleases
Expand Down Expand Up @@ -647,3 +652,11 @@ func (bc *BaseCache) Clear() {
func (bc *BaseCache) Close() error {
return nil
}

type (
// GrayCache 灰度 Cache 接口
GrayCache interface {
Cache
GetGrayRule(name string) *apimodel.MatchTerm
}
)
5 changes: 5 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ func (nc *CacheManager) ConfigGroup() types.ConfigGroupCache {
return nc.caches[types.CacheConfigGroup].(types.ConfigGroupCache)
}

// Gray get Gray cache information
func (nc *CacheManager) Gray() types.GrayCache {
return nc.caches[types.CacheGray].(types.GrayCache)
}

// GetCacher get types.Cache impl
func (nc *CacheManager) GetCacher(cacheIndex types.CacheIndex) types.Cache {
return nc.caches[cacheIndex]
Expand Down
55 changes: 33 additions & 22 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (fc *fileCache) setReleases(releases []*model.ConfigFileRelease) (map[strin
oldVal, _ := fc.releases.Get(item.Id)
if !item.Valid {
del++
if err := fc.handleDeleteRelease(oldVal, item); err != nil {
if err := fc.handleDeleteRelease(oldVal); err != nil {
return nil, update, del, err
}
} else {
Expand Down Expand Up @@ -204,6 +204,13 @@ func (fc *fileCache) sendEvent(item *model.ConfigFileRelease) {

// handleUpdateRelease
func (fc *fileCache) handleUpdateRelease(oldVal *model.SimpleConfigFileRelease, item *model.ConfigFileRelease) error {
// 如果ReleaseType类型变更, 先删除再保存
if oldVal != nil && oldVal.Typ != item.Typ {
if err := fc.handleDeleteRelease(oldVal); err != nil {
return err
}
}

fc.releases.Put(item.Id, item.SimpleConfigFileRelease)
func() {
// 记录 namespace -> group -> file_name -> []SimpleRelease 信息
Expand Down Expand Up @@ -252,47 +259,47 @@ func (fc *fileCache) handleUpdateRelease(oldVal *model.SimpleConfigFileRelease,
}

// handleDeleteRelease
func (fc *fileCache) handleDeleteRelease(oldVal *model.SimpleConfigFileRelease, item *model.ConfigFileRelease) error {
fc.releases.Del(item.Id)
func (fc *fileCache) handleDeleteRelease(release *model.SimpleConfigFileRelease) error {
if release == nil {
return nil
}
fc.releases.Del(release.Id)
func() {
// 记录 namespace -> group -> file_name -> []SimpleRelease 信息
if _, ok := fc.name2release.Load(item.Namespace); !ok {
if _, ok := fc.name2release.Load(release.Namespace); !ok {
return
}
namespace, _ := fc.name2release.Load(item.Namespace)
if _, ok := namespace.Load(item.Group); !ok {
namespace, _ := fc.name2release.Load(release.Namespace)
if _, ok := namespace.Load(release.Group); !ok {
return
}
group, _ := namespace.Load(item.Group)
if _, ok := group.Load(item.FileName); !ok {
group, _ := namespace.Load(release.Group)
if _, ok := group.Load(release.FileName); !ok {
return
}

files, _ := group.Load(item.FileName)
files.Delete(item.Name)
files, _ := group.Load(release.FileName)
files.Delete(release.Name)

if files.Len() == 0 {
group.Delete(item.FileName)
group.Delete(release.FileName)
}
}()

if oldVal == nil {
return nil
}
if !oldVal.Active {
if !release.Active {
return nil
}
if namespace, ok := fc.activeReleases.Load(item.Namespace); ok {
if group, ok := namespace.Load(item.Group); ok {
group.Delete(item.ActiveKey())
if namespace, ok := fc.activeReleases.Load(release.Namespace); ok {
if group, ok := namespace.Load(release.Group); ok {
group.Delete(release.ActiveKey())
}
}
if err := fc.valueCache.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(item.OwnerKey()))
bucket := tx.Bucket([]byte(release.OwnerKey()))
if bucket == nil {
return nil
}
return bucket.Delete([]byte(item.ActiveKey()))
return bucket.Delete([]byte(release.ActiveKey()))
}); err != nil {
return errors.Join(err, errors.New("remove config_file content fail"))
}
Expand Down Expand Up @@ -371,7 +378,7 @@ func (fc *fileCache) Name() string {
return types.ConfigFileCacheName
}

// GetActiveRelease
// GetGroupActiveReleases
func (fc *fileCache) GetGroupActiveReleases(namespace, group string) ([]*model.ConfigFileRelease, string) {
nsBucket, ok := fc.activeReleases.Load(namespace)
if !ok {
Expand All @@ -396,7 +403,7 @@ func (fc *fileCache) GetGroupActiveReleases(namespace, group string) ([]*model.C
}

// GetActiveRelease
func (fc *fileCache) GetActiveRelease(namespace, group, fileName string) *model.ConfigFileRelease {
func (fc *fileCache) GetActiveRelease(namespace, group, fileName string, typ model.ReleaseType) *model.ConfigFileRelease {
nsBucket, ok := fc.activeReleases.Load(namespace)
if !ok {
return nil
Expand All @@ -409,6 +416,7 @@ func (fc *fileCache) GetActiveRelease(namespace, group, fileName string) *model.
Namespace: namespace,
Group: group,
FileName: fileName,
Typ: typ,
}
simple, ok := groupBucket.Load(searchKey.ActiveKey())
if !ok {
Expand Down Expand Up @@ -479,6 +487,9 @@ func (fc *fileCache) QueryReleases(args *types.ConfigReleaseArgs) (uint32, []*mo
if args.ReleaseName != "" && utils.IsWildNotMatch(item.Name, args.ReleaseName) {
return
}
if !args.IncludeGray && item.Typ == model.ReleaseTypeGray {
return
}
if args.OnlyActive && !item.Active {
return
}
Expand Down
3 changes: 3 additions & 0 deletions cache/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
cacheconfig "github.com/polarismesh/polaris/cache/config"
cachens "github.com/polarismesh/polaris/cache/namespace"
cachesvc "github.com/polarismesh/polaris/cache/service"
cachegray "github.com/polarismesh/polaris/cache/gray"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/store"
)
Expand All @@ -47,6 +48,7 @@ func init() {
RegisterCache(types.StrategyRuleName, types.CacheAuthStrategy)
RegisterCache(types.ClientName, types.CacheClient)
RegisterCache(types.ServiceContractName, types.CacheServiceContract)
RegisterCache(types.GrayName, types.CacheGray)
}

var (
Expand Down Expand Up @@ -104,6 +106,7 @@ func newCacheManager(ctx context.Context, cacheOpt *Config, storage store.Store)
mgr.RegisterCacher(types.CacheAuthStrategy, cacheauth.NewStrategyCache(storage, mgr))
// 北极星SDK Client
mgr.RegisterCacher(types.CacheClient, cacheclient.NewClientCache(storage, mgr))
mgr.RegisterCacher(types.CacheGray, cachegray.NewGrayCache(storage, mgr))

if len(mgr.caches) != int(types.CacheLast) {
return nil, errors.New("some Cache implement not loaded into CacheManager")
Expand Down
129 changes: 129 additions & 0 deletions cache/gray/gray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package gray

import (
"bytes"
"time"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"

"github.com/golang/protobuf/jsonpb"
types "github.com/polarismesh/polaris/cache/api"
"github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/store"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
)

var (
_ types.GrayCache = (*grayCache)(nil)
)

type grayCache struct {
*types.BaseCache
storage store.Store
grayResources *utils.SyncMap[string, *apimodel.MatchTerm]
updater *singleflight.Group
}

// NewGrayCache create gray cache obj
func NewGrayCache(storage store.Store, cacheMgr types.CacheManager) types.GrayCache {
return &grayCache{
BaseCache: types.NewBaseCache(storage, cacheMgr),
storage: storage,
}
}

// Initialize init gray cache
func (gc *grayCache) Initialize(opt map[string]interface{}) error {
gc.grayResources = utils.NewSyncMap[string, *apimodel.MatchTerm]()
gc.updater = &singleflight.Group{}
return nil
}

// Update update cache
func (gc *grayCache) Update() error {
// 多个线程竞争,只有一个线程进行更新
_, err, _ := gc.updater.Do(gc.Name(), func() (interface{}, error) {
return nil, gc.DoCacheUpdate(gc.Name(), gc.realUpdate)
})
return err
}

func (gc *grayCache) realUpdate() (map[string]time.Time, int64, error) {
grayResources, err := gc.storage.GetMoreGrayResouces(gc.IsFirstUpdate(), gc.LastFetchTime())

if err != nil {
log.Error("[Cache][Gray] get storage more", zap.Error(err))
return nil, -1, err
}
if len(grayResources) == 0 {
return nil, 0, nil
}
lastMtimes := gc.setGrayResources(grayResources)
log.Info("[Cache][Gray] get more gray resource",
zap.Int("total", len(grayResources)))
return lastMtimes, int64(len(grayResources)), nil
}

func (gc *grayCache) setGrayResources(grayResources []*model.GrayResource) map[string]time.Time {
lastMtime := gc.LastMtime(gc.Name()).Unix()
for _, grayResource := range grayResources {
modifyUnix := grayResource.ModifyTime.Unix()
if modifyUnix > lastMtime {
lastMtime = modifyUnix
}
grayRule := &apimodel.MatchTerm{}
reader := bytes.NewReader([]byte(grayResource.MatchRule))
err := jsonpb.Unmarshal(reader, grayRule)
if err != nil {
log.Error("[Cache][Gray] setGrayResources unmarshal gray rule fail.",
zap.String("name", grayResource.Name), zap.Error(err))
continue
}
gc.grayResources.Store(grayResource.Name, grayRule)
}

return map[string]time.Time{
gc.Name(): time.Unix(lastMtime, 0),
}
}

// Clear clear cache
func (gc *grayCache) Clear() error {
gc.BaseCache.Clear()
gc.grayResources = utils.NewSyncMap[string, *apimodel.MatchTerm]()
return nil
}

// Name return gray name
func (gc *grayCache) Name() string {
return types.GrayName
}

// GetGrayRule get gray rule
func (gc *grayCache) GetGrayRule(name string) *apimodel.MatchTerm {
val, ok := gc.grayResources.Load(name)
if !ok {
return nil
}
return val
}
Loading

0 comments on commit 5f207e2

Please sign in to comment.