Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:nacos-client query service list not found and can't clean invalid client beat info #1326

Merged
merged 15 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apiserver/eurekaserver/eureka_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
_ "github.com/polarismesh/polaris/plugin/healthchecker/redis"
_ "github.com/polarismesh/polaris/plugin/history/logger"
_ "github.com/polarismesh/polaris/plugin/password"
_ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/logger"
_ "github.com/polarismesh/polaris/plugin/statis/prometheus"
Expand Down
1 change: 1 addition & 0 deletions apiserver/nacosserver/v1/discover/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (n *DiscoverServer) GetClientServer() (*restful.WebService, error) {
ws.Path("/nacos/v1/ns").Consumes(restful.MIME_JSON, model.MIME).Produces(restful.MIME_JSON)
n.addInstanceAccess(ws)
n.addSystemAccess(ws)
n.AddServiceAccess(ws)
return ws, nil
}

Expand Down
2 changes: 2 additions & 0 deletions apiserver/xdsserverv3/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
rawbuffer "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/raw_buffer/v3"
tlstrans "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -155,6 +156,7 @@
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
ServiceName: name,
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resourcev3.DefaultAPIVersion,

Check warning on line 159 in apiserver/xdsserverv3/cds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/cds.go#L159

Added line #L159 was not covered by tests
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
Expand Down
102 changes: 68 additions & 34 deletions apiserver/xdsserverv3/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@

// EDSBuilder .
type EDSBuilder struct {
svr service.DiscoverServer
}

func (eds *EDSBuilder) Init(svr service.DiscoverServer) {
eds.svr = svr
}

func (eds *EDSBuilder) Generate(option *resource.BuildOption) (interface{}, error) {
Expand Down Expand Up @@ -74,48 +72,84 @@
continue
}

var lbEndpoints []*endpoint.LbEndpoint
var lbEndpoints []*endpoint.LocalityLbEndpoints

Check warning on line 75 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L75

Added line #L75 was not covered by tests
if !option.ForceDelete {
for _, instance := range serviceInfo.Instances {
// 处于隔离状态或者权重为0的实例不进行下发
if !resource.IsNormalEndpoint(instance) {
continue
}
ep := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: instance.Host.Value,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: instance.Port.Value,
},
},
lbEndpoints = eds.buildServiceEndpoint(serviceInfo)
}

Check warning on line 78 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L77-L78

Added lines #L77 - L78 were not covered by tests

cla := &endpoint.ClusterLoadAssignment{
ClusterName: resource.MakeServiceName(svcKey, direction, option),
Endpoints: lbEndpoints,
}
clusterLoads = append(clusterLoads, cla)

Check warning on line 84 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L80-L84

Added lines #L80 - L84 were not covered by tests
}
return clusterLoads

Check warning on line 86 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L86

Added line #L86 was not covered by tests
}

func (eds *EDSBuilder) buildServiceEndpoint(serviceInfo *resource.ServiceInfo) []*endpoint.LocalityLbEndpoints {
locality := map[string]map[string]map[string][]*endpoint.LbEndpoint{}
for _, instance := range serviceInfo.Instances {
// 处于隔离状态或者权重为0的实例不进行下发
if !resource.IsNormalEndpoint(instance) {
continue

Check warning on line 94 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L89-L94

Added lines #L89 - L94 were not covered by tests
}
region := instance.GetLocation().GetRegion().GetValue()
zone := instance.GetLocation().GetZone().GetValue()
campus := instance.GetLocation().GetCampus().GetValue()
if _, ok := locality[region]; !ok {
locality[region] = map[string]map[string][]*endpoint.LbEndpoint{}
}
if _, ok := locality[region][zone]; !ok {
locality[region][zone] = map[string][]*endpoint.LbEndpoint{}
}
if _, ok := locality[region][zone][campus]; !ok {
locality[region][zone][campus] = make([]*endpoint.LbEndpoint, 0, 32)
}
ep := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: instance.Host.Value,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: instance.Port.Value,

Check warning on line 117 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L96-L117

Added lines #L96 - L117 were not covered by tests
},
},
},
},
HealthStatus: resource.FormatEndpointHealth(instance),
LoadBalancingWeight: utils.NewUInt32Value(instance.GetWeight().GetValue()),
Metadata: resource.GenEndpointMetaFromPolarisIns(instance),
}
lbEndpoints = append(lbEndpoints, ep)
}
HealthCheckConfig: &endpoint.Endpoint_HealthCheckConfig{
DisableActiveHealthCheck: true,
},
},
},
HealthStatus: resource.FormatEndpointHealth(instance),
LoadBalancingWeight: utils.NewUInt32Value(instance.GetWeight().GetValue()),
Metadata: resource.GenEndpointMetaFromPolarisIns(instance),

Check warning on line 129 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L122-L129

Added lines #L122 - L129 were not covered by tests
}
locality[region][zone][campus] = append(locality[region][zone][campus], ep)

Check warning on line 131 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L131

Added line #L131 was not covered by tests
}

cla := &endpoint.ClusterLoadAssignment{
ClusterName: resource.MakeServiceName(svcKey, direction, option),
Endpoints: []*endpoint.LocalityLbEndpoints{
{
retVal := make([]*endpoint.LocalityLbEndpoints, 0, len(serviceInfo.Instances))

for region := range locality {
for zone := range locality[region] {
for campus := range locality[region][zone] {
lbEndpoints := locality[region][zone][campus]
localityLbEndpoints := &endpoint.LocalityLbEndpoints{
Locality: &core.Locality{
Region: region,
Zone: zone,
SubZone: campus,
},

Check warning on line 145 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L134-L145

Added lines #L134 - L145 were not covered by tests
LbEndpoints: lbEndpoints,
},
},
}
retVal = append(retVal, localityLbEndpoints)
}

Check warning on line 149 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L147-L149

Added lines #L147 - L149 were not covered by tests
}
clusterLoads = append(clusterLoads, cla)
}
return clusterLoads
return retVal

Check warning on line 152 in apiserver/xdsserverv3/eds.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/eds.go#L152

Added line #L152 was not covered by tests
}

func (eds *EDSBuilder) makeSelfEndpoint(option *resource.BuildOption) []types.Resource {
Expand Down
20 changes: 20 additions & 0 deletions apiserver/xdsserverv3/resource/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,26 @@ type ServiceInfo struct {
FaultDetectRevision string
}

func (s *ServiceInfo) Equal(o *ServiceInfo) bool {
// 通过 revision 判断
if s.SvcInsRevision != o.SvcInsRevision {
return false
}
if s.SvcRoutingRevision != o.SvcRoutingRevision {
return false
}
if s.SvcRateLimitRevision != o.SvcRateLimitRevision {
return false
}
if s.CircuitBreakerRevision != o.CircuitBreakerRevision {
return false
}
if s.FaultDetectRevision != o.FaultDetectRevision {
return false
}
return true
}

func (s *ServiceInfo) MatchService(ns, name string) bool {
if s.Namespace == ns && s.Name == name {
return true
Expand Down
64 changes: 20 additions & 44 deletions apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,16 @@
continue
}

cacheServiceInfos := curRegistryInfo[ns]

Check warning on line 264 in apiserver/xdsserverv3/server.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/server.go#L264

Added line #L264 was not covered by tests
// 命名空间存在,但是命名空间下的服务有删除情况,需要找出来
for _, info := range infos {
cacheServiceInfos := curRegistryInfo[ns]
if _, ok := cacheServiceInfos[info.ServiceKey]; !ok {
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needRemove[ns][info.ServiceKey] = info
if _, ok := cacheServiceInfos[info.ServiceKey]; ok {

Check warning on line 267 in apiserver/xdsserverv3/server.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/server.go#L267

Added line #L267 was not covered by tests
continue
}
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needRemove[ns][info.ServiceKey] = info

Check warning on line 273 in apiserver/xdsserverv3/server.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/server.go#L270-L273

Added lines #L270 - L273 were not covered by tests
}
}

Expand All @@ -281,10 +281,20 @@
needPush[ns] = infos
continue
}

// 判断当前这个空间,是否需要更新配置
if x.checkUpdate(infos, cacheServiceInfos) {
needPush[ns] = infos
for _, info := range infos {
oldSvc, exist := cacheServiceInfos[info.ServiceKey]
// 如果原来的 cache 不存在,直接就是需要推送
showPush := !exist
if exist {
// 如果原来的 cache 存在,这需要在比对下数据是否出现变化
showPush = !info.Equal(oldSvc)
}
if showPush {
if _, ok := needPush[ns]; !ok {
needPush[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needPush[ns][info.ServiceKey] = info

Check warning on line 296 in apiserver/xdsserverv3/server.go

View check run for this annotation

Codecov / codecov/patch

apiserver/xdsserverv3/server.go#L284-L296

Added lines #L284 - L296 were not covered by tests
}
}
}

Expand Down Expand Up @@ -438,40 +448,6 @@
x.resourceGenerator.Generate(versionLocal, needPush, needRemove)
}

func (x *XDSServer) checkUpdate(curServiceInfo, cacheServiceInfo map[model.ServiceKey]*resource.ServiceInfo) bool {
if len(curServiceInfo) != len(cacheServiceInfo) {
return true
}
for _, info := range curServiceInfo {
find := false
for _, serviceInfo := range cacheServiceInfo {
if info.Name == serviceInfo.Name {
// 通过 revision 判断
if info.SvcInsRevision != serviceInfo.SvcInsRevision {
return true
}
if info.SvcRoutingRevision != serviceInfo.SvcRoutingRevision {
return true
}
if info.SvcRateLimitRevision != serviceInfo.SvcRateLimitRevision {
return true
}
if info.CircuitBreakerRevision != serviceInfo.CircuitBreakerRevision {
return true
}
if info.FaultDetectRevision != serviceInfo.FaultDetectRevision {
return true
}
find = true
}
}
if !find {
return true
}
}
return false
}

func (x *XDSServer) DebugHandlers() []model.DebugHandler {
return []model.DebugHandler{
{
Expand Down
1 change: 0 additions & 1 deletion auth/defaultauth/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
_ "github.com/polarismesh/polaris/plugin/healthchecker/redis"
_ "github.com/polarismesh/polaris/plugin/history/logger"
_ "github.com/polarismesh/polaris/plugin/password"
_ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/logger"
_ "github.com/polarismesh/polaris/plugin/statis/prometheus"
Expand Down
4 changes: 4 additions & 0 deletions cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ func (bc *BaseCache) Close() error {
return nil
}

func (bc *BaseCache) RefreshInterval() time.Duration {
return time.Second
}

type (
// GrayCache 灰度 Cache 接口
GrayCache interface {
Expand Down
37 changes: 22 additions & 15 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (nc *CacheManager) OpenResourceCache(entries ...types.ConfigEntry) error {
return nil
}

// update 缓存更新
func (nc *CacheManager) update() error {
// warmUp 缓存更新
func (nc *CacheManager) warmUp() error {
var wg sync.WaitGroup
entries := nc.needLoad.ToSlice()
for i := range entries {
Expand Down Expand Up @@ -124,25 +124,32 @@ func (nc *CacheManager) Start(ctx context.Context) error {

// 启动的时候,先更新一版缓存
log.Infof("[Cache] cache update now first time")
if err := nc.update(); err != nil {
if err := nc.warmUp(); err != nil {
return err
}
log.Infof("[Cache] cache update done")

// 启动协程,开始定时更新缓存数据
go func() {
ticker := time.NewTicker(nc.GetUpdateCacheInterval())
defer ticker.Stop()

for {
select {
case <-ticker.C:
_ = nc.update()
case <-ctx.Done():
return
}
entries := nc.needLoad.ToSlice()
for i := range entries {
name := entries[i]
index, exist := cacheSet[name]
if !exist {
return fmt.Errorf("cache resource %s not exists", name)
}
}()
go func(c types.Cache) {
ticker := time.NewTicker(nc.GetUpdateCacheInterval())
for {
select {
case <-ticker.C:
_ = c.Update()
case <-ctx.Done():
ticker.Stop()
return
}
}
}(nc.caches[index])
}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func (fc *fileCache) Initialize(opt map[string]interface{}) error {
fc.metricsReleaseCount = utils.NewSyncMap[string, *utils.SyncMap[string, uint64]]()
fc.preMetricsFiles = utils.NewAtomicValue[map[string]map[string]struct{}](map[string]map[string]struct{}{})
fc.lastReportTime = utils.NewAtomicValue[time.Time](time.Time{})
valueCache, err := openBoltCache(opt)
valueCache, err := fc.openBoltCache(opt)
if err != nil {
return err
}
fc.valueCache = valueCache
return nil
}

func openBoltCache(opt map[string]interface{}) (*bbolt.DB, error) {
func (fc *fileCache) openBoltCache(opt map[string]interface{}) (*bbolt.DB, error) {
path, _ := opt["cachePath"].(string)
if path == "" {
path = "./data/cache/config"
Expand Down
2 changes: 1 addition & 1 deletion cache/test_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ var (

// TestUpdate only for test
func (nc *CacheManager) TestUpdate() error {
return nc.update()
return nc.warmUp()
}
Loading
Loading