diff --git a/plugin/healthchecker/leader/peer.go b/plugin/healthchecker/leader/peer.go index f10d9b119..c08e789fa 100644 --- a/plugin/healthchecker/leader/peer.go +++ b/plugin/healthchecker/leader/peer.go @@ -41,9 +41,16 @@ var ( ErrorLeaderNotAlive = errors.New("leader not alive") ) +type ( + // 仅支持测试场景塞入即可 + ConnectFuncContextKey struct{} + ConnectPeerFunc func(*RemotePeer) error +) + var ( NewLocalPeerFunc = newLocalPeer NewRemotePeerFunc = newRemotePeer + ConnectPeer = doConnect ) func newLocalPeer() Peer { @@ -123,7 +130,7 @@ type RemotePeer struct { // 这里采用多个 Putter Client 创建多个 Stream puters []*beatSender // Cache data storage - Cache BeatRecordCache + cache BeatRecordCache // cancel . cancel context.CancelFunc // conf . @@ -138,16 +145,20 @@ func (p *RemotePeer) Initialize(conf Config) { p.conf = conf } -func (p *RemotePeer) Serve(_ context.Context, checker *LeaderHealthChecker, +func (p *RemotePeer) Serve(ctx context.Context, checker *LeaderHealthChecker, listenIP string, listenPort uint32) error { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel p.host = listenIP p.port = listenPort - if err := p.doConnect(); err != nil { - return err + + connectPeer := ConnectPeer + if val := ctx.Value(ConnectFuncContextKey{}); val != nil { + // 正常情况下只是为了测试场景使用 + connectPeer = val.(ConnectPeerFunc) } - p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc, p.Ping) + connectPeer(p) + p.cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc, p.Ping) go p.checkLeaderAlive(ctx) return nil } @@ -243,7 +254,7 @@ func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) error { } func (p *RemotePeer) Storage() BeatRecordCache { - return p.Cache + return p.cache } // Close close peer life @@ -307,7 +318,7 @@ func (p *RemotePeer) doClose() { } } -func (p *RemotePeer) doConnect() error { +func doConnect(p *RemotePeer) error { p.conns = make([]*grpc.ClientConn, 0, streamNum) p.puters = make([]*beatSender, 0, streamNum) for i := 0; i < streamNum; i++ { diff --git a/plugin/healthchecker/leader/peer_test.go b/plugin/healthchecker/leader/peer_test.go index 69cdd6674..ad73b83a4 100644 --- a/plugin/healthchecker/leader/peer_test.go +++ b/plugin/healthchecker/leader/peer_test.go @@ -165,8 +165,6 @@ func TestLocalPeer(t *testing.T) { } func TestRemotePeer(t *testing.T) { - t.SkipNow() - // close old event hub eventhub.InitEventHub() ctrl := gomock.NewController(t) mockStore := mock.NewMockStore(ctrl) @@ -185,7 +183,6 @@ func TestRemotePeer(t *testing.T) { assert.NoError(t, err) t.Cleanup(func() { _ = checker.Destroy() - eventhub.InitEventHub() ctrl.Finish() })