Skip to content

Commit

Permalink
feat:support nacos-address server endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Jun 30, 2024
1 parent df14a59 commit 6fbcd1f
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 39 deletions.
21 changes: 10 additions & 11 deletions apiserver/eurekaserver/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestCreateInstance(t *testing.T) {
time.Sleep(5 * time.Second)
instanceId := fmt.Sprintf("%s_%s_%d", appId, host, startPort)
code := eurekaSrv.deregisterInstance(context.Background(), namespace, appId, instanceId, false)
assert.Equal(t, api.ExecuteSuccess, code)
assert.Equal(t, api.ExecuteSuccess, code, fmt.Sprintf("%d", code))
time.Sleep(20 * time.Second)

deltaReq := restful.NewRequest(httpRequest)
Expand Down Expand Up @@ -244,18 +244,17 @@ func Test_EurekaWrite(t *testing.T) {
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
})
// 这里是异步注册
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))

time.Sleep(5 * time.Second)
_ = discoverSuit.CacheMgr().TestUpdate()
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.NotNil(t, saveIns)

t.Run("UpdateStatus", func(t *testing.T) {
t.Run("StatusUnknown", func(t *testing.T) {
t.Run("01_StatusUnknown", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
Expand All @@ -278,7 +277,7 @@ func Test_EurekaWrite(t *testing.T) {
assert.False(t, saveIns.Isolate())
})

t.Run("StatusDown", func(t *testing.T) {
t.Run("02_StatusDown", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
Expand All @@ -301,7 +300,7 @@ func Test_EurekaWrite(t *testing.T) {
assert.Equal(t, StatusDown, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("StatusUp", func(t *testing.T) {
t.Run("03_StatusUp", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
Expand All @@ -324,17 +323,17 @@ func Test_EurekaWrite(t *testing.T) {
assert.Equal(t, StatusUp, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("Polaris_UpdateInstances", func(t *testing.T) {
t.Run("04_Polaris_UpdateInstances", func(t *testing.T) {
defer func() {
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
rsp := discoverSuit.DiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(false),
},
})
assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue()))
}()
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
rsp := discoverSuit.DiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(true),
Expand All @@ -349,8 +348,8 @@ func Test_EurekaWrite(t *testing.T) {
assert.Equal(t, StatusOutOfService, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("Polaris_UpdateInstancesIsolate", func(t *testing.T) {
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
t.Run("05_Polaris_UpdateInstancesIsolate", func(t *testing.T) {
rsp := discoverSuit.DiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(true),
Expand Down
33 changes: 28 additions & 5 deletions cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ type ConfigEntry struct {

// CacheManager
type CacheManager interface {
// GetUpdateCacheInterval .
GetUpdateCacheInterval() time.Duration
// GetReportInterval .
GetReportInterval() time.Duration
// GetCacher
GetCacher(cacheIndex CacheIndex) Cache
// RegisterCacher
Expand Down Expand Up @@ -578,11 +582,13 @@ type BaseCache struct {
lock sync.RWMutex
// firstUpdate Whether the cache is loaded for the first time
// this field can only make value on exec initialize/clean, and set it to false on exec update
firstUpdate bool
s store.Store
lastFetchTime int64
lastMtimes map[string]time.Time
CacheMgr CacheManager
firstUpdate bool
s store.Store
lastFetchTime int64
lastMtimes map[string]time.Time
CacheMgr CacheManager
reportMetrics func()
lastReportMetricsTime time.Time
}

func NewBaseCache(s store.Store, cacheMgr CacheManager) *BaseCache {
Expand All @@ -595,6 +601,17 @@ func NewBaseCache(s store.Store, cacheMgr CacheManager) *BaseCache {
return c
}

func NewBaseCacheWithRepoerMetrics(s store.Store, cacheMgr CacheManager, reportMetrics func()) *BaseCache {
c := &BaseCache{
s: s,
CacheMgr: cacheMgr,
reportMetrics: reportMetrics,
}

c.initialize()
return c
}

func (bc *BaseCache) initialize() {
bc.lock.Lock()
defer bc.lock.Unlock()
Expand Down Expand Up @@ -698,6 +715,12 @@ func (bc *BaseCache) DoCacheUpdate(name string, executor func() (map[string]time
if total >= 0 {
metrics.RecordCacheUpdateCost(time.Since(start), name, total)
}
if bc.reportMetrics != nil {
if time.Since(bc.lastReportMetricsTime) >= bc.CacheMgr.GetReportInterval() {
bc.reportMetrics()
bc.lastReportMetricsTime = start
}
}
bc.firstUpdate = false
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
UpdateCacheInterval = 1 * time.Second
)

var (
ReportInterval = 1 * time.Second
)

// CacheManager 名字服务缓存
type CacheManager struct {
storage store.Store
Expand Down Expand Up @@ -165,6 +169,11 @@ func (nc *CacheManager) GetUpdateCacheInterval() time.Duration {
return UpdateCacheInterval
}

// GetReportInterval 获取当前cache的更新间隔
func (nc *CacheManager) GetReportInterval() time.Duration {
return ReportInterval
}

// Service 获取Service缓存信息
func (nc *CacheManager) Service() types.ServiceCache {
return nc.caches[types.CacheService].(types.ServiceCache)
Expand Down
2 changes: 2 additions & 0 deletions cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import "time"
type Config struct {
// DiffTime 设置拉取时间范围, [T1 - abs(DiffTime), T1]
DiffTime time.Duration `yaml:"diffTime"`
// ReportInterval 监控数据上报周期
ReportInterval time.Duration `yaml:"reportInterval"`
}

var (
Expand Down
6 changes: 3 additions & 3 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ type fileCache struct {

// NewConfigFileCache 创建文件缓存
func NewConfigFileCache(storage store.Store, cacheMgr types.CacheManager) types.ConfigFileCache {
cache := &fileCache{
BaseCache: types.NewBaseCache(storage, cacheMgr),
fc := &fileCache{
storage: storage,

Check failure on line 69 in cache/config/config_file.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21.5)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/polarismesh/polaris) --custom-order (gci)
}
return cache
fc.BaseCache = types.NewBaseCacheWithRepoerMetrics(storage, cacheMgr, fc.reportMetricsInfo)
return fc
}

// Initialize
Expand Down
8 changes: 3 additions & 5 deletions cache/config/config_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ type configGroupCache struct {

// NewConfigGroupCache 创建文件缓存
func NewConfigGroupCache(storage store.Store, cacheMgr types.CacheManager) types.ConfigGroupCache {
cache := &configGroupCache{
BaseCache: types.NewBaseCache(storage, cacheMgr),
gc := &configGroupCache{
storage: storage,

Check failure on line 50 in cache/config/config_group.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21.5)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/polarismesh/polaris) --custom-order (gci)
}
return cache
gc.BaseCache = types.NewBaseCacheWithRepoerMetrics(storage, cacheMgr, gc.reportMetricsInfo)
return gc
}

// Initialize
Expand Down Expand Up @@ -145,8 +145,6 @@ func (fc *configGroupCache) postProcessUpdatedGroups(affect map[string]struct{})
continue
}
count := nsBucket.Len()
fc.reportMetricsInfo(ns, count)

revisions := make([]string, 0, count)
nsBucket.Range(func(key string, val *model.ConfigFileGroup) {
revisions = append(revisions, val.Revision)
Expand Down
26 changes: 16 additions & 10 deletions cache/config/config_group_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ package config

import (
"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/plugin"
)

func (fc *configGroupCache) reportMetricsInfo(ns string, count int) {
reportValue := metrics.ConfigMetrics{
Type: metrics.ConfigGroupMetric,
Total: int64(count),
Release: 0,
Labels: map[string]string{
metrics.LabelNamespace: ns,
},
}
plugin.GetStatis().ReportConfigMetrics(reportValue)
func (fc *configGroupCache) reportMetricsInfo() {
fc.name2groups.Range(func(ns string, val *utils.SyncMap[string, *model.ConfigFileGroup]) {
count := val.Len()
reportValue := metrics.ConfigMetrics{
Type: metrics.ConfigGroupMetric,
Total: int64(count),
Release: 0,
Labels: map[string]string{
metrics.LabelNamespace: ns,
},
}
plugin.GetStatis().ReportConfigMetrics(reportValue)
})

}
29 changes: 29 additions & 0 deletions cache/mock/cache_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ type instanceCache struct {

// NewInstanceCache 新建一个instanceCache
func NewInstanceCache(storage store.Store, cacheMgr types.CacheManager) types.InstanceCache {
return &instanceCache{
BaseCache: types.NewBaseCache(storage, cacheMgr),
ic := &instanceCache{
storage: storage,
singleFlight: new(singleflight.Group),
}

ic.BaseCache = types.NewBaseCacheWithRepoerMetrics(storage, cacheMgr, ic.reportMetricsInfo)
return ic
}

// Initialize 初始化函数
Expand Down Expand Up @@ -161,7 +163,6 @@ func (ic *instanceCache) realUpdate() (map[string]time.Time, int64, error) {
for i := range instanceChangeEvents {
_ = eventhub.Publish(eventhub.CacheInstanceEventTopic, instanceChangeEvents[i])
}
ic.reportMetricsInfo()
}()

if err := tx.CreateReadView(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (s *Server) DeleteInstances(ctx context.Context, req []*apiservice.Instance

// DeleteInstance 删除单个服务实例
func (s *Server) DeleteInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response {
insId, _ := utils.CheckInstanceTetrad(req)
req.Id = wrapperspb.String(insId)
ins := *req // 防止污染外部的req
ins.ServiceToken = utils.NewStringValue(parseInstanceReqToken(ctx, req))
return s.deleteInstance(ctx, req, &ins)
Expand Down
4 changes: 2 additions & 2 deletions service/interceptor/paramcheck/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *Server) RegisterInstance(ctx context.Context, req *apiservice.Instance)
if err := checkMetadata(req.GetMetadata()); err != nil {
return api.NewInstanceResponse(apimodel.Code_InvalidMetadata, req)
}
instanceID, rsp := checkReviseInstance(req)
instanceID, rsp := checkCreateInstance(req)
if rsp != nil {
return rsp
}
Expand All @@ -62,7 +62,7 @@ func (s *Server) RegisterInstance(ctx context.Context, req *apiservice.Instance)

// DeregisterInstance delete onr instance by client
func (s *Server) DeregisterInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response {
instanceID, resp := checkCreateInstance(req)
instanceID, resp := checkReviseInstance(req)
if resp != nil {
return resp
}
Expand Down

0 comments on commit 6fbcd1f

Please sign in to comment.