Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:修复资源清理逻辑以及配置中心的bug #1397

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
309dda9
docs:add error code desc
chuntaojun Sep 25, 2022
3145271
fix:调整license-checker的触发
chuntaojun Oct 19, 2022
fa3b1cf
fix:调整license-checker的触发
chuntaojun Oct 19, 2022
f700ebe
hotfix:修复鉴权interceptor遗漏请求来源
chuntaojun Jun 27, 2023
49a157a
refactor:鉴权能力优化调整
chuntaojun Oct 15, 2024
8b0d10a
feat:支持自定义初始化管理员帐户
chuntaojun Oct 23, 2024
897c1ac
feat:支持自定义初始化管理员帐户
chuntaojun Oct 23, 2024
450d53c
feat:支持自定义初始化管理员帐户
chuntaojun Oct 23, 2024
a800d12
fix:修复defer用法不当导致内存泄漏问题
chuntaojun Oct 24, 2024
123775b
feat:支持初始化管理员帐户
chuntaojun Nov 6, 2024
d8f48fe
feat:支持初始化管理员帐户
chuntaojun Nov 18, 2024
e8e9fb7
feat:支持初始化管理员帐户
chuntaojun Nov 21, 2024
8980b08
feat:支持初始化管理员帐户
chuntaojun Nov 27, 2024
0b00ebb
feat:支持初始化管理员帐户
chuntaojun Nov 27, 2024
db26d9d
feat:支持初始化管理员帐户
chuntaojun Nov 27, 2024
722ebbf
fix:服务列表支持服务可见性&修复服务可见性优先级判断
chuntaojun Nov 29, 2024
48e8510
fix:服务列表支持服务可见性&修复服务可见性优先级判断
chuntaojun Nov 29, 2024
46387bf
fix:服务列表支持服务可见性&修复服务可见性优先级判断
chuntaojun Nov 29, 2024
a09ba84
fix:服务列表支持服务可见性&修复服务可见性优先级判断
chuntaojun Nov 29, 2024
1a2334b
fix:服务列表支持服务可见性&修复服务可见性优先级判断
chuntaojun Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ type AdminOperateServer interface {
ReleaseLeaderElection(ctx context.Context, electKey string) error
// GetCMDBInfo get cmdb info
GetCMDBInfo(ctx context.Context) ([]model.LocationView, error)
// InitMainUser
InitMainUser(ctx context.Context, user apisecurity.User) error
// HasMainUser .
HasMainUser(ctx context.Context) *apiservice.Response
// InitMainUser .
InitMainUser(ctx context.Context, user *apisecurity.User) *apiservice.Response
// GetServerFunctions Get server functions
GetServerFunctions(ctx context.Context) []authcommon.ServerFunctionGroup
}
14 changes: 10 additions & 4 deletions admin/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"

"github.com/polarismesh/polaris/admin/job"
"github.com/polarismesh/polaris/auth"
"github.com/polarismesh/polaris/cache"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/healthcheck"
Expand All @@ -48,13 +49,15 @@ func RegisterServerProxy(name string, factor ServerProxyFactory) error {

// Initialize 初始化
func Initialize(ctx context.Context, cfg *Config, namingService service.DiscoverServer,
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) error {
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store,
userSvr auth.UserServer, policySvr auth.StrategyServer) error {

if finishInit {
return nil
}

proxySvr, actualSvr, err := InitServer(ctx, cfg, namingService, healthCheckServer, cacheMgn, storage)
proxySvr, actualSvr, err := InitServer(ctx, cfg, namingService, healthCheckServer, cacheMgn,
storage, userSvr, policySvr)
if err != nil {
return err
}
Expand All @@ -66,13 +69,16 @@ func Initialize(ctx context.Context, cfg *Config, namingService service.Discover
}

func InitServer(ctx context.Context, cfg *Config, namingService service.DiscoverServer,
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store) (AdminOperateServer, *Server, error) {
healthCheckServer *healthcheck.Server, cacheMgn *cache.CacheManager, storage store.Store,
userSvr auth.UserServer, policySvr auth.StrategyServer) (AdminOperateServer, *Server, error) {

actualSvr := new(Server)

actualSvr.userSvr = userSvr
actualSvr.policySvr = policySvr
actualSvr.namingServer = namingService
actualSvr.healthCheckServer = healthCheckServer
actualSvr.cacheMgn = cacheMgn
actualSvr.cacheMgr = cacheMgn
actualSvr.storage = storage

maintainJobs := job.NewMaintainJobs(namingService, cacheMgn, storage)
Expand Down
8 changes: 4 additions & 4 deletions admin/interceptor/auth/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func (svr *Server) collectMaintainAuthContext(ctx context.Context, resourceOp au
)
}

func (s *Server) HasMainUser(ctx context.Context) (bool, error) {
return false, nil
func (s *Server) HasMainUser(ctx context.Context) *apiservice.Response {
return s.nextSvr.HasMainUser(ctx)
}

func (s *Server) InitMainUser(ctx context.Context, user apisecurity.User) error {
return nil
func (s *Server) InitMainUser(ctx context.Context, user *apisecurity.User) *apiservice.Response {
return s.nextSvr.InitMainUser(ctx, user)
}

func (svr *Server) GetServerConnections(ctx context.Context, req *admincommon.ConnReq) (*admincommon.ConnCountResp, error) {
Expand Down
79 changes: 75 additions & 4 deletions admin/job/clean_deleted_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

var cleanFuncMapping = map[string]func(timeout time.Duration, job *cleanDeletedResourceJob){
"instance": cleanDeletedInstances,
"service": cleanDeletedServices,
"clients": cleanDeletedClients,
"instance": cleanDeletedInstances,
"service": cleanDeletedServices,
"clients": cleanDeletedClients,
"service_contract": cleanDeletedServiceContracts,
"circuitbreaker_rule": func(timeout time.Duration, job *cleanDeletedResourceJob) {
cleanDeletedRules("circuitbreaker_rule", timeout, job)
},
Expand All @@ -48,6 +49,51 @@ var cleanFuncMapping = map[string]func(timeout time.Duration, job *cleanDeletedR
"config_file_release": cleanDeletedConfigFiles,
}

var defaultCleanDeletedResourceConfig = CleandeletedResourceConf{
Resources: []CleanDeletedResource{
{
Resource: "instance",
Enable: true,
},
{
Resource: "service",
Enable: true,
},
{
Resource: "service_contract",
Enable: true,
},
{
Resource: "clients",
Enable: true,
},
{
Resource: "circuitbreaker_rule",
Enable: true,
},
{
Resource: "ratelimit_rule",
Enable: true,
},
{
Resource: "router_rule",
Enable: true,
},
{
Resource: "faultdetect_rule",
Enable: true,
},
{
Resource: "lane_rule",
Enable: true,
},
{
Resource: "config_file_release",
Enable: true,
},
},
}

type CleanDeletedResource struct {
// Resource 记录需要清理的资源类型
Resource string `mapstructure:"resource"`
Expand All @@ -59,7 +105,7 @@ type CleanDeletedResource struct {

type CleandeletedResourceConf struct {
// ResourceTimeout 记录资源的额外超时时间,用户可自定义
Resources []CleanDeletedResource `json:"resourceTimeout"`
Resources []CleanDeletedResource `json:"resources" mapstructure:"resources"`
// Timeout 记录清理资源的超时时间,默认20分钟
Timeout time.Duration `mapstructure:"timeout"`
}
Expand Down Expand Up @@ -90,6 +136,16 @@ func (job *cleanDeletedResourceJob) init(raw map[string]interface{}) error {
cfg.Timeout = 2 * time.Minute
}
job.cfg = cfg

if len(cfg.Resources) == 0 {
job.cfg.Resources = defaultCleanDeletedResourceConfig.Resources
for i := range job.cfg.Resources {
if job.cfg.Resources[i].Timeout == nil {
job.cfg.Resources[i].Timeout = &job.cfg.Timeout
}
}
}

return nil
}

Expand Down Expand Up @@ -182,6 +238,21 @@ func cleanDeletedInstances(timeout time.Duration, job *cleanDeletedResourceJob)
}
}

func cleanDeletedServiceContracts(timeout time.Duration, job *cleanDeletedResourceJob) {
batchSize := uint32(100)
for {
count, err := job.storage.BatchCleanDeletedClients(timeout, batchSize)
if err != nil {
log.Errorf("[Maintain][Job][CleanDeletedClients] batch clean deleted client, err: %v", err)
break
}
log.Infof("[Maintain][Job][CleanDeletedClients] clean deleted client count %d", count)
if count < batchSize {
break
}
}
}

func cleanDeletedRules(rule string, timeout time.Duration, job *cleanDeletedResourceJob) {
batchSize := uint32(100)
for {
Expand Down
28 changes: 24 additions & 4 deletions admin/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apisecurity "github.com/polarismesh/specification/source/go/api/v1/security"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/wrapperspb"

api "github.com/polarismesh/polaris/common/api/v1"
connlimit "github.com/polarismesh/polaris/common/conn/limit"
Expand All @@ -39,12 +40,31 @@ import (
"github.com/polarismesh/polaris/plugin"
)

func (s *Server) HasMainUser(ctx context.Context, user apisecurity.User) (bool, error) {
return false, nil
// HasMainUser 判断是否存在主用户
func (s *Server) HasMainUser(ctx context.Context) *apiservice.Response {
mainUser, err := s.storage.GetMainUser()
if err != nil {
log.Error("check hash main user", zap.Error(err), utils.RequestID(ctx))
return api.NewResponse(apimodel.Code_ExecuteException)
}
if mainUser == nil {
return api.NewResponse(apimodel.Code_NotFoundResource)
}
ret := mainUser.ToSpec()
ret.AuthToken = wrapperspb.String("")
return api.NewUserResponse(apimodel.Code_ExecuteSuccess, ret)
}

func (s *Server) InitMainUser(ctx context.Context, user apisecurity.User) error {
return nil
// InitMainUser 初始化主用户
func (s *Server) InitMainUser(_ context.Context, user *apisecurity.User) *apiservice.Response {
if user.GetSource().GetValue() == "" {
user.Source = utils.NewStringValue("Polaris")
}
ctx := context.WithValue(context.Background(), authcommon.ContextKeyInitMainUser{}, true)
rsp := s.userSvr.CreateUsers(ctx, []*apisecurity.User{
user,
})
return rsp.Responses[0]
}

func (s *Server) GetServerConnections(_ context.Context, req *admin.ConnReq) (*admin.ConnCountResp, error) {
Expand Down
5 changes: 4 additions & 1 deletion admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package admin
import (
"sync"

"github.com/polarismesh/polaris/auth"
"github.com/polarismesh/polaris/cache"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/healthcheck"
Expand All @@ -32,8 +33,10 @@ type Server struct {
mu sync.Mutex
namingServer service.DiscoverServer
healthCheckServer *healthcheck.Server
cacheMgn *cache.CacheManager
cacheMgr *cache.CacheManager
storage store.Store
userSvr auth.UserServer
policySvr auth.StrategyServer
}

func GetChainOrder() []string {
Expand Down
108 changes: 60 additions & 48 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
func (g *ConfigGRPCServer) UpsertAndPublishConfigFile(ctx context.Context,
req *apiconfig.ConfigFilePublishInfo) (*apiconfig.ConfigClientResponse, error) {
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.CasUpsertAndReleaseConfigFileFromClient(ctx, req)
response := g.configServer.UpsertAndReleaseConfigFileFromClient(ctx, req)
return &apiconfig.ConfigClientResponse{
Code: response.Code,
Info: response.Info,
Expand Down Expand Up @@ -182,55 +182,67 @@
continue
}

var out *apiconfig.ConfigDiscoverResponse
var action string
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: action,
ClientIP: utils.ParseClientAddress(ctx),
Namespace: in.GetConfigFile().GetNamespace().GetValue(),
Resource: metrics.ResourceOfConfigFile(in.GetConfigFile().GetGroup().GetValue(), in.GetConfigFile().GetFileName().GetValue()),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: out.GetRevision(),
Success: out.GetCode() > uint32(apimodel.Code_DataNoChange),
})
}()

switch in.Type {
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE:
action = metrics.ActionGetConfigFile
ret := g.configServer.GetConfigFileWithCache(ctx, &apiconfig.ClientConfigFileInfo{})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFile = ret.GetConfigFile()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE
out.Revision = strconv.Itoa(int(out.GetConfigFile().GetVersion().GetValue()))
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_Names:
action = metrics.ActionListConfigFiles
ret := g.configServer.GetConfigFileNamesWithCache(ctx, &apiconfig.ConfigFileGroupRequest{
Revision: wrapperspb.String(in.GetRevision()),
ConfigFileGroup: &apiconfig.ConfigFileGroup{
Namespace: in.GetConfigFile().GetNamespace(),
Name: in.GetConfigFile().GetGroup(),
},
})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFileNames = ret.GetConfigFileInfos()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_Names
out.Revision = ret.GetRevision().GetValue()
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_GROUPS:
action = metrics.ActionListConfigGroups
req := in.GetConfigFile()
req.Md5 = wrapperspb.String(in.GetRevision())
out = g.configServer.GetConfigGroupsWithCache(ctx, req)
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_GROUPS
default:
out = api.NewConfigDiscoverResponse(apimodel.Code_InvalidDiscoverResource)
}

out := g.handleDiscoverRequest(ctx, in)
if err := svr.Send(out); err != nil {
return err
}
}
}

func (g *ConfigGRPCServer) handleDiscoverRequest(ctx context.Context, in *apiconfig.ConfigDiscoverRequest) *apiconfig.ConfigDiscoverResponse {

Check failure on line 192 in apiserver/grpcserver/config/client_access.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21.5)

the line is 142 characters long, which exceeds the maximum of 140 characters. (lll)
var out *apiconfig.ConfigDiscoverResponse
var action string
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: action,
ClientIP: utils.ParseClientAddress(ctx),
Namespace: in.GetConfigFile().GetNamespace().GetValue(),
Resource: metrics.ResourceOfConfigFile(in.GetConfigFile().GetGroup().GetValue(), in.GetConfigFile().GetFileName().GetValue()),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: out.GetRevision(),
Success: out.GetCode() > uint32(apimodel.Code_DataNoChange),
})
}()

switch in.Type {
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE:
action = metrics.ActionGetConfigFile
version, _ := strconv.ParseUint(in.GetRevision(), 10, 64)
ret := g.configServer.GetConfigFileWithCache(ctx, &apiconfig.ClientConfigFileInfo{
Namespace: in.GetConfigFile().GetNamespace(),
Group: in.GetConfigFile().GetGroup(),
FileName: in.GetConfigFile().GetFileName(),
Version: wrapperspb.UInt64(version),
PublicKey: in.GetConfigFile().GetPublicKey(),
})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFile = ret.GetConfigFile()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE
out.Revision = strconv.Itoa(int(out.GetConfigFile().GetVersion().GetValue()))
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_Names:
action = metrics.ActionListConfigFiles
ret := g.configServer.GetConfigFileNamesWithCache(ctx, &apiconfig.ConfigFileGroupRequest{
Revision: wrapperspb.String(in.GetRevision()),
ConfigFileGroup: &apiconfig.ConfigFileGroup{
Namespace: in.GetConfigFile().GetNamespace(),
Name: in.GetConfigFile().GetGroup(),
},
})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFileNames = ret.GetConfigFileInfos()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_Names
out.Revision = ret.GetRevision().GetValue()
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_GROUPS:
action = metrics.ActionListConfigGroups
req := in.GetConfigFile()
req.Md5 = wrapperspb.String(in.GetRevision())
out = g.configServer.GetConfigGroupsWithCache(ctx, req)
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_GROUPS
default:
out = api.NewConfigDiscoverResponse(apimodel.Code_InvalidDiscoverResource)
}

return out
}
Loading
Loading