diff --git a/core/chanrpc/chanrpc.go b/core/chanrpc/chanrpc.go index 2a5593e..537720c 100644 --- a/core/chanrpc/chanrpc.go +++ b/core/chanrpc/chanrpc.go @@ -77,8 +77,6 @@ func (s *Server) Register(id interface{}, f interface{}) { panic(fmt.Sprintf("function id %v: already registered", id)) } - //log.Debug("chanrpc", "注册,id=%v,f=%v", id, f) - s.functions[id] = f } diff --git a/core/conf/apollo/apollo.go b/core/conf/apollo/apollo.go index 9c3807c..fa72a23 100644 --- a/core/conf/apollo/apollo.go +++ b/core/conf/apollo/apollo.go @@ -61,7 +61,6 @@ func SetNetAgent(a network.AgentServer) { }(timer) } -//router断开 func CenterDisconnect() { regSubList = make(map[ConfKey]*ConfValue) } @@ -117,9 +116,8 @@ func ProcessReq(cmd *network.TCPCommand, data []byte) error { return nil } -// 读取配置中心的配置,找不到时,返回空字符串 func GetConfig(key, defaultValue string) string { - nsKey := ConfKey{Key: key, AppType: conf.AppType, AppId: conf.AppID} + nsKey := ConfKey{Key: key, AppType: conf.AppInfo.AppType, AppId: conf.AppInfo.AppID} mutexConfig.Lock() defer mutexConfig.Unlock() if item, ok := configValues[nsKey]; ok { @@ -128,7 +126,6 @@ func GetConfig(key, defaultValue string) string { return defaultValue } -// 读取配置中心的配置,找不到或出错时,返回0 func GetConfigAsInt64(key string, defaultValue int64) int64 { v, _ := strconv.ParseInt(GetConfig(key, strconv.FormatInt(defaultValue, 10)), 10, 64) return v @@ -146,11 +143,9 @@ func RegisterConfig(key string, reqAppType, reqAppId uint32, cb cbNotify) { mxRegSub.Unlock() log.Info("Apollo", "注册Apollo订阅,%v", nsKey) - //发起一次订阅 SendSubscribeReq(nsKey, false) } -//发送订阅 func SendSubscribeReq(k ConfKey, cancel bool) { if netAgent == nil { return @@ -163,8 +158,8 @@ func SendSubscribeReq(k ConfKey, cancel bool) { } var req config.ApolloCfgReq - req.AppType = proto.Uint32(conf.AppType) - req.AppId = proto.Uint32(conf.AppID) + req.AppType = proto.Uint32(conf.AppInfo.AppType) + req.AppId = proto.Uint32(conf.AppInfo.AppID) req.SubAppType = proto.Uint32(k.AppType) req.SubAppId = proto.Uint32(k.AppId) req.KeyName = proto.String(k.Key) @@ -182,7 +177,6 @@ func SendSubscribeReq(k ConfKey, cancel bool) { netAgent.SendMessage(bm) } -//注册回调 func RegPublicCB(cb cbNotify) { if cb == nil { return diff --git a/core/conf/conf.go b/core/conf/conf.go index a845e12..f895525 100644 --- a/core/conf/conf.go +++ b/core/conf/conf.go @@ -1,13 +1,16 @@ package conf +import ( + "encoding/json" + "fmt" + "io/ioutil" + "strings" + "xlddz/core/util" +) + var ( LenStackBuf = 4096 - // console - ConsolePort int = 2021 - ConsolePrompt string = "Leaf$:" - ProfilePath string - // skeleton conf GoLen = 10000 TimerDispatcherLen = 10000 @@ -15,9 +18,39 @@ var ( ChanRPCLen = 10000 //服务基础属性 + AppInfo BaseInfo + //AppName string + //AppID uint32 + //AppType uint32 + //ListenOnAddress string + //CenterAddr string +) + +type BaseInfo struct { AppName string AppID uint32 AppType uint32 ListenOnAddress string CenterAddr string -) +} + +func ParseCmdArgs() { + if AppInfo.AppName != "" { + data, err := ioutil.ReadFile(fmt.Sprintf("conf/%v.json", AppInfo.AppName)) + if err == nil { + err = json.Unmarshal(data, &AppInfo) + } + } + if v, ok := util.ParseArgs("/AppID"); ok { + AppInfo.AppID = v + } + if v, ok := util.ParseArgs("/AppType"); ok { + AppInfo.AppType = v + } + if v, ok := util.ParseArgs("/DockerRun"); ok && v == 1 { + addr := strings.Split(AppInfo.CenterAddr, ":") + if len(addr) == 2 { + AppInfo.CenterAddr = "center:" + addr[1] + } + } +} diff --git a/core/console/command.go b/core/console/command.go deleted file mode 100644 index 1a35fc2..0000000 --- a/core/console/command.go +++ /dev/null @@ -1,245 +0,0 @@ -package console - -import ( - "fmt" - "os" - "path" - "runtime/pprof" - "time" - "xlddz/core/chanrpc" - "xlddz/core/conf" - "xlddz/core/log" - "xlddz/core/util" -) - -var commands = []Command{ - new(CommandHelp), - new(CommandCPUProf), - new(CommandProf), - new(CommandShowLog), -} - -type Command interface { - // must goroutine safe - name() string - // must goroutine safe - help() string - // must goroutine safe - run(args []string) string -} - -type ExternalCommand struct { - _name string - _help string - server *chanrpc.Server -} - -func (c *ExternalCommand) name() string { - return c._name -} - -func (c *ExternalCommand) help() string { - return c._help -} - -func (c *ExternalCommand) run(_args []string) string { - args := make([]interface{}, len(_args)) - for i, v := range _args { - args[i] = v - } - - ret, err := c.server.Call1(c._name, args...) - if err != nil { - return err.Error() - } - output, ok := ret.(string) - if !ok { - return "invalid output type" - } - - return output -} - -// you must call the function before calling console.Init -// goroutine not safe -func Register(name string, help string, f interface{}, server *chanrpc.Server) { - for _, c := range commands { - if c.name() == name { - log.Fatal("command %v is already registered", name) - } - } - - server.Register(name, f) - - c := new(ExternalCommand) - c._name = name - c._help = help - c.server = server - commands = append(commands, c) -} - -// help -type CommandHelp struct{} - -func (c *CommandHelp) name() string { - return "help" -} - -func (c *CommandHelp) help() string { - return "this help text" -} - -func (c *CommandHelp) run([]string) string { - output := "Commands:\r\n" - for _, c := range commands { - output += c.name() + " - " + c.help() + "\r\n" - } - output += "quit - exit console" - - return output -} - -// cpuprof -type CommandCPUProf struct{} - -func (c *CommandCPUProf) name() string { - return "cpuprof" -} - -func (c *CommandCPUProf) help() string { - return "CPU profiling for the current process" -} - -func (c *CommandCPUProf) usage() string { - return "cpuprof writes runtime profiling data in the format expected by \r\n" + - "the pprof visualization tool\r\n\r\n" + - "Usage: cpuprof start|stop\r\n" + - " start - enables CPU profiling\r\n" + - " stop - stops the current CPU profile" -} - -func (c *CommandCPUProf) run(args []string) string { - if len(args) == 0 { - return c.usage() - } - - switch args[0] { - case "start": - fn := profileName() + ".cpuprof" - f, err := os.Create(fn) - if err != nil { - return err.Error() - } - err = pprof.StartCPUProfile(f) - if err != nil { - f.Close() - return err.Error() - } - return fn - case "stop": - pprof.StopCPUProfile() - return "" - default: - return c.usage() - } -} - -func profileName() string { - now := time.Now() - return path.Join(conf.ProfilePath, - fmt.Sprintf("%d%02d%02d_%02d_%02d_%02d", - now.Year(), - now.Month(), - now.Day(), - now.Hour(), - now.Minute(), - now.Second())) -} - -// prof -type CommandProf struct{} - -func (c *CommandProf) name() string { - return "prof" -} - -func (c *CommandProf) help() string { - return "writes a pprof-formatted snapshot" -} - -func (c *CommandProf) usage() string { - return "prof writes runtime profiling data in the format expected by \r\n" + - "the pprof visualization tool\r\n\r\n" + - "Usage: prof goroutine|heap|thread|block\r\n" + - " goroutine - stack traces of all current goroutines\r\n" + - " heap - a sampling of all heap allocations\r\n" + - " thread - stack traces that led to the creation of new OS threads\r\n" + - " block - stack traces that led to blocking on synchronization primitives" -} - -func (c *CommandProf) run(args []string) string { - if len(args) == 0 { - return c.usage() - } - - var ( - p *pprof.Profile - fn string - ) - switch args[0] { - case "goroutine": - p = pprof.Lookup("goroutine") - fn = profileName() + ".gprof" - case "heap": - p = pprof.Lookup("heap") - fn = profileName() + ".hprof" - case "thread": - p = pprof.Lookup("threadcreate") - fn = profileName() + ".tprof" - case "block": - p = pprof.Lookup("block") - fn = profileName() + ".bprof" - default: - return c.usage() - } - - f, err := os.Create(fn) - if err != nil { - return err.Error() - } - defer f.Close() - err = p.WriteTo(f, 0) - if err != nil { - return err.Error() - } - - return fn -} - -// 打印日志 -type CommandShowLog struct{} - -func (c *CommandShowLog) name() string { - return "showlog" -} - -func (c *CommandShowLog) help() string { - return util.StrToGBK("显示日志,关闭:showlog 0,打开:showlog 1") -} - -func (c *CommandShowLog) run(args []string) string { - - if len(args) == 0 { - return "参数非法,关闭:showlog 0,打开:showlog 1" - } - output := "开始打印" - print := 1 - if args[0] == "0" { - output = "关闭打印" - print = 0 - } - - log.SetScreenPrint(print) - - return util.StrToGBK(output) -} diff --git a/core/console/console.go b/core/console/console.go deleted file mode 100644 index 831288c..0000000 --- a/core/console/console.go +++ /dev/null @@ -1,106 +0,0 @@ -package console - -import ( - "bufio" - "math" - "net" - "strconv" - "strings" - "xlddz/core/conf" - "xlddz/core/network" - - "github.com/golang/protobuf/proto" -) - -var server *network.TCPServer - -func Init() { - if conf.ConsolePort == 0 { - return - } - - // log.Debug("Console", "Console localhost:=%v", conf.ConsolePort) - - server = new(network.TCPServer) - server.Addr = "localhost:" + strconv.Itoa(conf.ConsolePort) - server.MaxConnNum = int(math.MaxInt32) - server.PendingWriteNum = 100 - server.NewAgent = newAgent - - server.Start() -} - -func Destroy() { - if server != nil { - server.Close() - } -} - -type Agent struct { - conn *network.TCPConn - reader *bufio.Reader -} - -func newAgent(conn *network.TCPConn, id uint64) network.AgentClient { - a := new(Agent) - a.conn = conn - a.reader = bufio.NewReader(conn) - return a -} - -func (a *Agent) Run() { - for { - if conf.ConsolePrompt != "" { - a.conn.Write([]byte(conf.ConsolePrompt)) - } - - line, err := a.reader.ReadString('\n') - if err != nil { - break - } - line = strings.TrimSuffix(line[:len(line)-1], "\r") - - args := strings.Fields(line) - if len(args) == 0 { - continue - } - if args[0] == "quit" { - break - } - - var c Command - for _, _c := range commands { - if _c.name() == args[0] { - c = _c - break - } - } - if c == nil { - a.conn.Write([]byte("command not found, try `help` for help\r\n")) - continue - } - output := c.run(args[1:]) - if output != "" { - a.conn.Write([]byte(output + "\r\n")) - } - } -} - -func (a *Agent) OnClose() {} -func (a *Agent) SendMessage(bm network.BaseMessage) {} -func (a *Agent) SendMessage2App(destAppType, destAppid uint32, bm network.BaseMessage) {} -func (a *Agent) SendMessage2Client(bm network.BaseMessage, userID, gateConnID, sessionID uint64) {} -func (a *Agent) SendData(mainCmdID, subCmdID uint32, m proto.Message) {} -func (a *Agent) AgentInfo() network.BaseAgentInfo { - return network.BaseAgentInfo{} -} -func (a *Agent) SendData2App(DestApptype uint32, DestAppid uint32, mainCmdID uint32, subCmdID uint32, m proto.Message) { -} -func (a *Agent) LocalAddr() net.Addr { - return a.conn.LocalAddr() -} -func (a *Agent) RemoteAddr() net.Addr { - return a.conn.RemoteAddr() -} -func (a *Agent) Close() {} -func (a *Agent) Destroy() {} diff --git a/core/core.go b/core/core.go index e08138a..77dc14f 100644 --- a/core/core.go +++ b/core/core.go @@ -3,41 +3,23 @@ package core import ( "os" "os/signal" - "strings" "xlddz/core/conf" - "xlddz/core/conf/apollo" "xlddz/core/gate" "xlddz/core/log" - "xlddz/core/network" - "xlddz/core/util" ) -func Start() { +func Start(appName string) { + conf.AppInfo.AppName = appName // logger - logger, err := log.New(conf.AppName) + logger, err := log.New(conf.AppInfo.AppName) if err != nil { panic(err) } log.Export(logger) defer logger.Close() - if v, ok := util.ParseArgs("/AppID"); ok { - conf.AppID = v - } - if v, ok := util.ParseArgs("/AppType"); ok { - conf.AppType = v - } - if v, ok := util.ParseArgs("/DockerRun"); ok { - if v == 1 && conf.AppType != network.AppCenter { - addr := strings.Split(conf.CenterAddr, ":") - if len(addr) == 2 { - conf.CenterAddr = "center:" + addr[1] - } - } - } - if conf.AppType == network.AppCenter { - apollo.RegisterConfig("", conf.AppType, conf.AppID, nil) - } + //args + conf.ParseCmdArgs() gate.Start() diff --git a/core/gate/clients.go b/core/gate/clients.go index 063b2c6..89f770c 100644 --- a/core/gate/clients.go +++ b/core/gate/clients.go @@ -21,24 +21,21 @@ type agentClient struct { } func (a *agentClient) Run() { - for { bm, msgData, err := a.conn.ReadMsg() if err != nil { log.Warning("agentClient", "异常,网关读取消息失败,id=%v,err=%v", a.id, err) break } - if Processor == nil { + if processor == nil { log.Error("agentClient", "异常,解析器为nil断开连接,cmd=%v", &bm.Cmd) break } - if bm.Cmd.MainCmdID == uint16(n.CMDConfig) && bm.Cmd.SubCmdID == uint16(config.CMDID_Config_IDApolloCfgRsp) { apollo.ProcessReq(&bm.Cmd, msgData) continue } - - if conf.AppType != n.AppCenter && bm.Cmd.MainCmdID == uint16(n.CMDCenter) { + if conf.AppInfo.AppType != n.AppCenter && bm.Cmd.MainCmdID == uint16(n.CMDCenter) { if bm.Cmd.SubCmdID == uint16(center.CMDID_Center_IDAppRegReq) { var m center.RegisterAppReq _ = proto.Unmarshal(msgData, &m) @@ -47,13 +44,12 @@ func (a *agentClient) Run() { } else if bm.Cmd.SubCmdID == uint16(center.CMDID_Center_IDPulseNotify) { } - continue } unmarshalCmd := bm.Cmd var cmd, msg, dataReq interface{} - if bm.Cmd.MainCmdID == uint16(n.CMDGate) && bm.Cmd.SubCmdID == uint16(gate.CMDID_Gate_IDTransferDataReq) && conf.AppType != n.AppGate { + if bm.Cmd.MainCmdID == uint16(n.CMDGate) && bm.Cmd.SubCmdID == uint16(gate.CMDID_Gate_IDTransferDataReq) && conf.AppInfo.AppType != n.AppGate { var m gate.TransferDataReq _ = proto.Unmarshal(msgData, &m) unmarshalCmd = n.TCPCommand{MainCmdID: uint16(m.GetDataCmdKind()), SubCmdID: uint16(m.GetDataCmdSubid())} @@ -61,14 +57,12 @@ func (a *agentClient) Run() { dataReq = &m } - cmd, msg, err = Processor.Unmarshal(unmarshalCmd.MainCmdID, unmarshalCmd.SubCmdID, msgData) + cmd, msg, err = processor.Unmarshal(unmarshalCmd.MainCmdID, unmarshalCmd.SubCmdID, msgData) if err != nil { log.Error("agentClient", "unmarshal message,headCmd=%v,error: %v", bm.Cmd, err) continue } - - baseMsg := n.BaseMessage{MyMessage: msg, TraceId: bm.TraceId} - err = Processor.Route(baseMsg, a, cmd, dataReq) + err = processor.Route(n.BaseMessage{MyMessage: msg, TraceId: bm.TraceId}, a, cmd, dataReq) if err != nil { log.Error("agentClient", "client agentClient route message error: %v,cmd=%v", err, cmd) continue @@ -77,8 +71,8 @@ func (a *agentClient) Run() { } func (a *agentClient) OnClose() { - if AgentChanRPC != nil { - err := AgentChanRPC.Call0(Disconnect, a, a.id) + if agentChanRPC != nil { + err := agentChanRPC.Call0(Disconnect, a, a.id) if err != nil { log.Error("agentClient", "chanrpc error: %v", err) } diff --git a/core/gate/gate.go b/core/gate/gate.go index f4917d4..3a7ff90 100644 --- a/core/gate/gate.go +++ b/core/gate/gate.go @@ -21,10 +21,11 @@ import ( //网络事件 const ( - ConnectSuccess string = "ConnectSuccess" - Disconnect string = "Disconnect" - CenterConnected string = "CenterConnected" - CenterRegResult string = "CenterRegResult" + ConnectSuccess string = "ConnectSuccess" + Disconnect string = "Disconnect" + CenterConnected string = "CenterConnected" + CenterDisconnect string = "CenterDisconnect" + CenterRegResult string = "CenterRegResult" ) const ( @@ -34,12 +35,13 @@ const ( var ( cbCenterDisconnect []func() - tcpLog *n.TCPClient + tcpLog *n.TCPClient = nil mxServers sync.Mutex + wg sync.WaitGroup servers map[uint64]*agentServer = make(map[uint64]*agentServer) - AgentChanRPC *chanrpc.Server + agentChanRPC *chanrpc.Server Skeleton = module.NewSkeleton(conf.GoLen, conf.TimerDispatcherLen, conf.AsynCallLen, conf.ChanRPCLen) - Processor = protobuf.NewProcessor() + processor = protobuf.NewProcessor() MaxConnNum int PendingWriteNum int MaxMsgLen uint32 @@ -51,39 +53,46 @@ var ( KeyFile string // tcp - LenMsgLen int - LittleEndian bool - closeSig chan bool + LenMsgLen int + closeSig chan bool ) func init() { - if tcpLog == nil { - tcpLog = new(n.TCPClient) - } + tcpLog = new(n.TCPClient) cbCenterDisconnect = append(cbCenterDisconnect, apollo.CenterDisconnect) apollo.RegPublicCB(ApolloNotify) - AgentChanRPC = Skeleton.ChanRPCServer + agentChanRPC = Skeleton.ChanRPCServer closeSig = make(chan bool, 1) } func Start() { + + if conf.AppInfo.AppType == n.AppCenter { + apollo.RegisterConfig("", conf.AppInfo.AppType, conf.AppInfo.AppID, nil) + } + + wg.Add(2) go func() { Skeleton.Run() + wg.Done() }() go func() { Run() + wg.Done() }() } func Stop() { + defer util.TryE(conf.AppInfo.AppName) Skeleton.Close() closeSig <- true + wg.Wait() } func MsgRegister(m proto.Message, mainCmdId uint32, subCmdId uint16, f interface{}) { chanRPC := Skeleton.ChanRPCServer - Processor.Register(m, mainCmdId, subCmdId, chanRPC) + processor.Register(m, mainCmdId, subCmdId, chanRPC) chanRPC.Register(reflect.TypeOf(m), f) } @@ -93,7 +102,7 @@ func EventRegister(id interface{}, f interface{}) { func Run() { - log.Debug("", "Run,TCPAddr=%v", conf.ListenOnAddress) + log.Debug("", "Run,ListenOnAddress=%v", conf.AppInfo.ListenOnAddress) var wsServer *n.WSServer if WSAddr != "" { @@ -107,34 +116,33 @@ func Run() { wsServer.KeyFile = KeyFile wsServer.NewAgent = func(conn *n.WSConn) n.AgentClient { a := &agentClient{conn: conn} - if AgentChanRPC != nil { - AgentChanRPC.Go(ConnectSuccess, a) + if agentChanRPC != nil { + agentChanRPC.Go(ConnectSuccess, a) } return a } } var tcpServer *n.TCPServer - if conf.ListenOnAddress != "" { + if conf.AppInfo.ListenOnAddress != "" { tcpServer = new(n.TCPServer) - tcpServer.Addr = conf.ListenOnAddress + tcpServer.Addr = conf.AppInfo.ListenOnAddress tcpServer.MaxConnNum = MaxConnNum tcpServer.PendingWriteNum = PendingWriteNum tcpServer.LenMsgLen = LenMsgLen tcpServer.MaxMsgLen = MaxMsgLen - tcpServer.LittleEndian = LittleEndian tcpServer.GetConfig = apollo.GetConfigAsInt64 tcpServer.NewAgent = func(conn *n.TCPConn, agentId uint64) n.AgentClient { a := &agentClient{id: agentId, conn: conn, info: n.BaseAgentInfo{AgentType: n.NormalUser}} - if AgentChanRPC != nil { - AgentChanRPC.Go(ConnectSuccess, a, agentId) + if agentChanRPC != nil { + agentChanRPC.Go(ConnectSuccess, a, agentId) } return a } } - if conf.CenterAddr != "" && conf.AppType != n.AppCenter { - newServerItem(n.BaseAgentInfo{AgentType: n.CommonServer, AppName: "center", AppType: n.AppCenter, ListenOnAddress: conf.CenterAddr}, true, PendingWriteNum) + if conf.AppInfo.CenterAddr != "" && conf.AppInfo.AppType != n.AppCenter { + newServerItem(n.BaseAgentInfo{AgentType: n.CommonServer, AppName: "center", AppType: n.AppCenter, ListenOnAddress: conf.AppInfo.CenterAddr}, true, PendingWriteNum) } if wsServer != nil { @@ -157,8 +165,7 @@ func Run() { } func ApolloNotify(k apollo.ConfKey, v apollo.ConfValue) { - //得到日志服务 - if conf.AppType != n.AppLogger && k.Key == "日志服务器地址" && v.Value != "" && + if conf.AppInfo.AppType != n.AppLogger && k.Key == "日志服务器地址" && v.Value != "" && v.RspCount == 1 && tcpLog != nil && !tcpLog.IsRunning() { logAddr := v.Value if v, ok := util.ParseArgs("/DockerRun"); ok && v == 1 { @@ -186,13 +193,13 @@ func ApolloNotify(k apollo.ConfKey, v apollo.ConfValue) { var logReq logger.LogReq logReq.FileName = proto.String(i.File) logReq.LineNo = proto.Uint32(uint32(i.Line)) - logReq.SrcApptype = proto.Uint32(conf.AppType) - logReq.SrcAppid = proto.Uint32(conf.AppID) + logReq.SrcApptype = proto.Uint32(conf.AppInfo.AppType) + logReq.SrcAppid = proto.Uint32(conf.AppInfo.AppID) logReq.Content = []byte(i.LogStr) logReq.ClassName = []byte(i.Classname) logReq.LogLevel = proto.Uint32(uint32(i.Level)) logReq.TimeMs = proto.Uint64(i.TimeMs) - logReq.SrcAppname = proto.String(conf.AppName) + logReq.SrcAppname = proto.String(conf.AppInfo.AppName) cmd := n.TCPCommand{MainCmdID: uint16(n.CMDLogger), SubCmdID: uint16(logger.CMDID_Logger_IDLogReq)} bm := n.BaseMessage{MyMessage: &logReq, Cmd: cmd} a.SendMessage(bm) @@ -208,14 +215,14 @@ func ApolloNotify(k apollo.ConfKey, v apollo.ConfValue) { func sendRegAppReq(a *agentServer) { var registerReq center.RegisterAppReq registerReq.AuthKey = proto.String("GoldBaby") - registerReq.AppName = proto.String(conf.AppName) - registerReq.AppType = proto.Uint32(conf.AppType) - registerReq.AppId = proto.Uint32(conf.AppID) - myAddress := conf.ListenOnAddress + registerReq.AppName = proto.String(conf.AppInfo.AppName) + registerReq.AppType = proto.Uint32(conf.AppInfo.AppType) + registerReq.AppId = proto.Uint32(conf.AppInfo.AppID) + myAddress := conf.AppInfo.ListenOnAddress if v, ok := util.ParseArgs("/DockerRun"); ok && v == 1 { - addr := strings.Split(conf.ListenOnAddress, ":") + addr := strings.Split(conf.AppInfo.ListenOnAddress, ":") if len(addr) == 2 { - myAddress = conf.AppName + ":" + addr[1] + myAddress = conf.AppInfo.AppName + ":" + addr[1] } } registerReq.MyAddress = proto.String(myAddress) diff --git a/core/gate/servers.go b/core/gate/servers.go index 4944366..fdc9356 100644 --- a/core/gate/servers.go +++ b/core/gate/servers.go @@ -48,7 +48,7 @@ func newServerItem(info n.BaseAgentInfo, autoReconnect bool, pendingWriteNum int if n.AppConfig == info.AppType { apollo.SetNetAgent(a) - apollo.RegisterConfig("", conf.AppType, conf.AppID, nil) + apollo.RegisterConfig("", conf.AppInfo.AppType, conf.AppInfo.AppID, nil) } mxServers.Lock() @@ -90,7 +90,7 @@ func (a *agentServer) Run() { mxServers.Lock() _, ok := servers[uint64(m.GetAppType())<<32|uint64(m.GetAppId())] mxServers.Unlock() - if !(conf.AppType == m.GetAppType() && conf.AppID == m.GetAppId()) && !ok { + if !(conf.AppInfo.AppType == m.GetAppType() && conf.AppInfo.AppID == m.GetAppId()) && !ok { if m.GetAppAddress() != "" { info := n.BaseAgentInfo{AgentType: n.CommonServer, AppName: m.GetAppName(), AppType: m.GetAppType(), AppID: m.GetAppId(), ListenOnAddress: m.GetAppAddress()} newServerItem(info, false, 0) @@ -100,7 +100,7 @@ func (a *agentServer) Run() { } } - if conf.AppType == n.AppConfig { + if conf.AppInfo.AppType == n.AppConfig { mxServers.Lock() if _, ok := servers[uint64(n.AppCenter)<<32|uint64(0)]; ok { servers[uint64(n.AppCenter)<<32|uint64(0)].info.AppID = m.GetCenterId() @@ -110,8 +110,8 @@ func (a *agentServer) Run() { } else { log.Warning("agentServer", "注册失败,RouterId=%v,原因=%v", m.GetCenterId(), m.GetReregToken()) } - if AgentChanRPC != nil { - AgentChanRPC.Call0(CenterRegResult, m.GetRegResult(), m.GetCenterId()) + if agentChanRPC != nil { + agentChanRPC.Call0(CenterRegResult, m.GetRegResult(), m.GetCenterId()) } case uint16(center.CMDID_Center_IDAppState): //app状态改变 var m center.AppStateNotify diff --git a/core/module/module.go b/core/module/module.go deleted file mode 100644 index a3bab34..0000000 --- a/core/module/module.go +++ /dev/null @@ -1,73 +0,0 @@ -package module - -import ( - "runtime" - "sync" - "xlddz/core/conf" - "xlddz/core/log" -) - -type Module interface { - OnInit() - OnDestroy() - Run(closeSig chan bool) -} - -type module struct { - mi Module - closeSig chan bool - wg sync.WaitGroup -} - -var mods []*module - -func Register(mi Module) { - m := new(module) - m.mi = mi - m.closeSig = make(chan bool, 1) - - mods = append(mods, m) -} - -func Init() { - for i := 0; i < len(mods); i++ { - mods[i].mi.OnInit() - } - - for i := 0; i < len(mods); i++ { - m := mods[i] - m.wg.Add(1) - go run(m) - } - -} - -func Destroy() { - for i := len(mods) - 1; i >= 0; i-- { - m := mods[i] - m.closeSig <- true - m.wg.Wait() - destroy(m) - } -} - -func run(m *module) { - m.mi.Run(m.closeSig) - m.wg.Done() -} - -func destroy(m *module) { - defer func() { - if r := recover(); r != nil { - if conf.LenStackBuf > 0 { - buf := make([]byte, conf.LenStackBuf) - l := runtime.Stack(buf, false) - log.Error("module", "%v: %s", r, buf[:l]) - } else { - log.Error("module", "%v", r) - } - } - }() - - m.mi.OnDestroy() -} diff --git a/core/module/skeleton.go b/core/module/skeleton.go index 1a8ea8e..7e197be 100644 --- a/core/module/skeleton.go +++ b/core/module/skeleton.go @@ -3,7 +3,6 @@ package module import ( "time" "xlddz/core/chanrpc" - "xlddz/core/console" "xlddz/core/timer" g "xlddz/core/go" @@ -133,7 +132,3 @@ func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) { s.server.Register(id, f) } - -func (s *Skeleton) RegisterCommand(name string, help string, f interface{}) { - console.Register(name, help, f, s.commandServer) -} diff --git a/core/network/protobuf/protobuf.go b/core/network/protobuf/protobuf.go index e08f078..e780556 100644 --- a/core/network/protobuf/protobuf.go +++ b/core/network/protobuf/protobuf.go @@ -1,7 +1,6 @@ package protobuf import ( - "errors" "fmt" "github.com/golang/protobuf/proto" "math" @@ -15,16 +14,14 @@ import ( // | id | protobuf message | // ------------------------- type Processor struct { - littleEndian bool - msgInfo map[network.TCPCommand]*MsgInfo - msgID map[reflect.Type]network.TCPCommand - defaultMsgRouter *chanrpc.Server + littleEndian bool + msgInfo map[network.TCPCommand]*MsgInfo + msgID map[reflect.Type]network.TCPCommand } type MsgInfo struct { - msgType reflect.Type - msgRouter *chanrpc.Server - msgCallBack func(args []interface{}) + msgType reflect.Type + msgRouter *chanrpc.Server } func NewProcessor() *Processor { @@ -32,7 +29,6 @@ func NewProcessor() *Processor { p.littleEndian = false p.msgInfo = make(map[network.TCPCommand]*MsgInfo) p.msgID = make(map[reflect.Type]network.TCPCommand) - p.defaultMsgRouter = nil return p } @@ -64,33 +60,6 @@ func (p *Processor) Register(msg proto.Message, mainCmdID uint32, subCmdID uint1 p.msgInfo[command] = i } -// 同步回调 -func (p *Processor) RegHandle(msg proto.Message, mainCmdID uint32, subCmdID uint16, msgCallBack func(args []interface{})) { - msgType := reflect.TypeOf(msg) - if msgType == nil || msgType.Kind() != reflect.Ptr { - log.Fatal("proto", "protobuf message pointer required") - } - if len(p.msgInfo) >= math.MaxUint16 { - log.Fatal("proto", "too many protobuf messages (max = %v)", math.MaxUint16) - } - - //协议命令 - command := network.TCPCommand{MainCmdID: uint16(mainCmdID), SubCmdID: subCmdID} - if _, ok := p.msgInfo[command]; ok { - log.Fatal("proto", "message %s,cmd=%v is already registered", msgType, command) - } - - i := new(MsgInfo) - i.msgType = msgType - i.msgCallBack = msgCallBack - p.msgInfo[command] = i -} - -//设置默认路由 -func (p *Processor) SetDefaultRouter(msgRouter *chanrpc.Server) { - p.defaultMsgRouter = msgRouter -} - // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) SetRouter(id network.TCPCommand, msgRouter *chanrpc.Server) { _, ok := p.msgInfo[id] @@ -103,48 +72,24 @@ func (p *Processor) SetRouter(id network.TCPCommand, msgRouter *chanrpc.Server) // goroutine safe func (p *Processor) Route(args ...interface{}) error { - //最少三个参数 - if len(args) < 3 { - return fmt.Errorf("路由消息参数过少,%v", len(args)) + if len(args) < network.MinRouteArgsCount { + return fmt.Errorf("路由消息参数过少,MinRouteArgsCount=%v,len(args)=%v", len(args), network.MinRouteArgsCount) } - //注册处理 - id := *args[network.CMD_INDEX].(*network.TCPCommand) + id := *args[network.CMDIndex].(*network.TCPCommand) i, ok := p.msgInfo[id] - if ok { - //使用rpc异步投递 - if i.msgRouter != nil { - i.msgRouter.Go(i.msgType, args...) - return nil - } - - //使用回调同步回调 - if i.msgCallBack != nil { - i.msgCallBack(args) - return nil - } - } - - //默认处理 - if p.defaultMsgRouter != nil { - p.defaultMsgRouter.Go(network.DefaultNetMsgFuncId, args...) + if ok && i.msgRouter != nil { + i.msgRouter.Go(i.msgType, args...) return nil } - - log.Error("proto", "protobuf.go Route nil,id=%v ", id) - return errors.New("异常,protobuf.go Route nil") + return fmt.Errorf("异常,protobuf.go Route nil,ok=%v,id=%v", ok, id) } // goroutine safe -func (p *Processor) Unmarshal(mainCmdID uint16, subCmdID uint16, data []byte) (interface{}, interface{}, error) { - +func (p *Processor) Unmarshal(mainCmdID, subCmdID uint16, data []byte) (interface{}, interface{}, error) { id := network.TCPCommand{MainCmdID: mainCmdID, SubCmdID: subCmdID} - - //是否为注册消息 if _, ok := p.msgInfo[id]; !ok { - log.Error("proto", "protobuf Unmarshal木有找到ID=%v", id) - errInfo := fmt.Sprintf("解析时木找到注册id=%v", id) - return &id, nil, errors.New(errInfo) + return &id, nil, fmt.Errorf("protobuf Unmarshal木有找到ID=%v", id) } // msg diff --git a/core/network/tcp_client.go b/core/network/tcp_client.go index 25a189a..ba7a055 100644 --- a/core/network/tcp_client.go +++ b/core/network/tcp_client.go @@ -91,7 +91,6 @@ func (client *TCPClient) init() { // msg parser msgParser := NewMsgParser() msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen) - msgParser.SetByteOrder(client.LittleEndian) client.msgParser = msgParser } diff --git a/core/network/tcp_msg.go b/core/network/tcp_msg.go index 3bd26c7..f9a9823 100644 --- a/core/network/tcp_msg.go +++ b/core/network/tcp_msg.go @@ -3,9 +3,9 @@ package network import ( "bytes" "encoding/binary" + "fmt" "io" "math" - "xlddz/core/log" ) //appid类型 @@ -39,16 +39,16 @@ const ( AppActionData uint32 = 73 ) -//回调参数列表下标 const ( - DATA_INDEX = 0 //数据 - AGENT_INDEX = 1 //网络代理 - CMD_INDEX = 2 //TCPCommon - OTHER_INDEX = 3 //其他 + DataIndex = 0 //数据 + AgentIndex = 1 //网络代理 + CMDIndex = 2 //TCPCommon + OtherIndex = 3 //其他 ) -//默认网络消息rpc处理Id -const DefaultNetMsgFuncId string = "DefaultNetMsgFuncId" +const ( + MinRouteArgsCount = 3 +) //网络命令 type TCPCommand struct { @@ -62,7 +62,6 @@ const ( TraceIdLen = 16 ) -// PackageHeader 比赛架构下的消息头 type PackageHeader struct { version uint8 encrypt uint8 @@ -83,18 +82,16 @@ type BaseMessage struct { // | 4bit(msgSize) | 2bit(headSize) | 1bit(version) + 1bit(encrypt) + 2bit(CmdKind) + 2bit(CmdId) + Xbit(other) | msgData // -------------- type MsgParser struct { - lenMsgLen int - minMsgLen uint32 - maxMsgLen uint32 - littleEndian bool + lenMsgLen int + minMsgLen uint32 + maxMsgLen uint32 } func NewMsgParser() *MsgParser { p := new(MsgParser) p.lenMsgLen = 2 p.minMsgLen = 8 - p.maxMsgLen = 4096 - p.littleEndian = false + p.maxMsgLen = 8 * 1024 return p } @@ -128,32 +125,27 @@ func (p *MsgParser) SetMsgLen(lenMsgLen int, minMsgLen uint32, maxMsgLen uint32) } } -// It's dangerous to call the method on reading or writing -func (p *MsgParser) SetByteOrder(littleEndian bool) { - p.littleEndian = littleEndian -} - // | msgSize | headSize | header | msgData // |4bit(msgSize)| 2bit(headSize) | 1bit(version) + 1bit(encrypt) + 2bit(CmdKind) + 2bit(CmdId) + Xbit(other) | msgData func (p *MsgParser) Read(conn *TCPConn) (BaseMessage, []byte, error) { - //读取消息头 msgSizeBuf := make([]byte, 4) if _, err := io.ReadFull(conn, msgSizeBuf); err != nil { - log.Warning("MsgParser", "消息头读取失败,%v", err) - return BaseMessage{}, nil, err + return BaseMessage{}, nil, fmt.Errorf("消息头读取失败,%v", err) } var msgSize uint32 = 0 if err := binary.Read(bytes.NewBuffer(msgSizeBuf), binary.BigEndian, &msgSize); err != nil { - log.Error("MsgParser", "消息体长度读取失败,%v", err) - return BaseMessage{}, nil, err + return BaseMessage{}, nil, fmt.Errorf("消息体长度读取失败,%v", err) + } + + if msgSize < p.minMsgLen || msgSize > p.maxMsgLen { + return BaseMessage{}, nil, fmt.Errorf("消息长度有问题,msgSize=%v,minMsgLen=%d,maxMsgLen=%d", msgSize, p.minMsgLen, p.maxMsgLen) } // data allData := make([]byte, msgSize) if _, err := io.ReadFull(conn, allData); err != nil { - log.Error("MsgParser", "消息体内容读取失败,%v", err) - return BaseMessage{}, nil, err + return BaseMessage{}, nil, fmt.Errorf("消息体内容读取失败,%v", err) } var headSize uint16 = 0 diff --git a/core/network/tcp_server.go b/core/network/tcp_server.go index b10033f..f458346 100644 --- a/core/network/tcp_server.go +++ b/core/network/tcp_server.go @@ -25,11 +25,10 @@ type TCPServer struct { agentId uint64 // msg parser - LenMsgLen int - MinMsgLen uint32 - MaxMsgLen uint32 - LittleEndian bool - msgParser *MsgParser + LenMsgLen int + MinMsgLen uint32 + MaxMsgLen uint32 + msgParser *MsgParser } func (server *TCPServer) Start() { @@ -61,7 +60,6 @@ func (server *TCPServer) init() { // msg parser msgParser := NewMsgParser() msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen) - msgParser.SetByteOrder(server.LittleEndian) server.msgParser = msgParser } diff --git a/core/util/debugInfo.go b/core/util/debugInfo.go index 8786ec2..ac2b12f 100644 --- a/core/util/debugInfo.go +++ b/core/util/debugInfo.go @@ -6,10 +6,9 @@ import ( "path" "runtime/debug" "time" - "xlddz/core/conf" ) -func TryE() { +func TryE(appName string) { errs := recover() if errs == nil { return @@ -20,7 +19,7 @@ func TryE() { pathname := "" curPath, err := GetCurrentPath() if err == nil { - pathname = curPath + "log/" + conf.AppName + "/" + pathname = curPath + "log/" + appName + "/" _, err := os.Stat(pathname) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(pathname, os.ModePerm) diff --git a/servers/center/business/business.go b/servers/center/business/business.go index 26802c0..68d9437 100644 --- a/servers/center/business/business.go +++ b/servers/center/business/business.go @@ -12,7 +12,6 @@ import ( "xlddz/core/log" n "xlddz/core/network" "xlddz/protocol/center" - "xlddz/servers/center/conf" ) var ( @@ -81,7 +80,7 @@ func disconnect(args []interface{}) { func configChangeNotify(k apollo.ConfKey, v apollo.ConfValue) { - key := apollo.ConfKey{AppType: lconf.AppType, AppId: lconf.AppID, Key: "服务维护"} + key := apollo.ConfKey{AppType: lconf.AppInfo.AppType, AppId: lconf.AppInfo.AppID, Key: "服务维护"} if k == key { type appInfo struct { AppType uint32 @@ -106,9 +105,9 @@ func configChangeNotify(k apollo.ConfKey, v apollo.ConfValue) { } func handleRegisterAppReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*center.RegisterAppReq) - a := args[n.AGENT_INDEX].(n.AgentClient) + a := args[n.AgentIndex].(n.AgentClient) //连接存在判断 if _, ok := appConnData[a]; !ok { @@ -133,7 +132,7 @@ func handleRegisterAppReq(args []interface{}) { var rsp center.RegisterAppRsp rsp.RegResult = proto.Uint32(1) rsp.ReregToken = proto.String(resultMsg) - rsp.CenterId = proto.Uint32(conf.Server.AppID) + rsp.CenterId = proto.Uint32(lconf.AppInfo.AppID) a.SendData(n.CMDCenter, uint32(center.CMDID_Center_IDAppRegRsp), &rsp) a.Close() @@ -156,7 +155,7 @@ func handleRegisterAppReq(args []interface{}) { var rsp center.RegisterAppRsp rsp.RegResult = proto.Uint32(0) rsp.ReregToken = proto.String(token) - rsp.CenterId = proto.Uint32(conf.Server.AppID) + rsp.CenterId = proto.Uint32(lconf.AppInfo.AppID) rsp.AppName = proto.String(i.appName) rsp.AppType = proto.Uint32(i.appType) rsp.AppId = proto.Uint32(i.appId) @@ -179,9 +178,9 @@ func handleAppStateNotify(args []interface{}) { } func handleAppPulseNotify(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*center.AppPulseNotify) - a := args[n.AGENT_INDEX].(n.AgentClient) + a := args[n.AgentIndex].(n.AgentClient) //非法判断 if _, ok := appConnData[a]; !ok { @@ -219,7 +218,7 @@ func broadcastAppState(appType, appId uint32) { } var rsp center.AppStateNotify rsp.AppState = proto.Uint32(uint32(center.AppStateNotify_OffLine)) - rsp.CenterId = proto.Uint32(conf.Server.AppID) + rsp.CenterId = proto.Uint32(lconf.AppInfo.AppID) rsp.AppType = proto.Uint32(appType) rsp.AppId = proto.Uint32(appId) a.SendData(n.CMDCenter, uint32(center.CMDID_Center_IDAppState), &rsp) diff --git a/servers/center/conf/center.json b/servers/center/conf/center.json index 9fec778..dad6398 100644 --- a/servers/center/conf/center.json +++ b/servers/center/conf/center.json @@ -1,5 +1,7 @@ { "AppName": "center", + "AppType": 6, "AppID": 10, - "TCPAddr": "0.0.0.0:10001" + "ListenOnAddress": "0.0.0.0:10001", + "CenterAddr": "" } \ No newline at end of file diff --git a/servers/center/conf/conf.go b/servers/center/conf/conf.go deleted file mode 100644 index 6bf0214..0000000 --- a/servers/center/conf/conf.go +++ /dev/null @@ -1,24 +0,0 @@ -package conf - -import ( - "log" - "time" -) - -var ( - // log conf - LogFlag = log.LstdFlags - - // gate conf - PendingWriteNum = 2000 - MaxMsgLen uint32 = 4096 - HTTPTimeout = 10 * time.Second - LenMsgLen = 2 - LittleEndian = false - - // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 -) diff --git a/servers/center/conf/json.go b/servers/center/conf/json.go deleted file mode 100644 index 881e3d9..0000000 --- a/servers/center/conf/json.go +++ /dev/null @@ -1,32 +0,0 @@ -package conf - -import ( - "encoding/json" - "io/ioutil" - lconf "xlddz/core/conf" - "xlddz/core/log" - n "xlddz/core/network" -) - -var Server struct { - AppName string - AppID uint32 - TCPAddr string -} - -func init() { - data, err := ioutil.ReadFile("conf/center.json") - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - err = json.Unmarshal(data, &Server) - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - - lconf.AppName = Server.AppName - lconf.AppType = n.AppCenter - lconf.AppID = Server.AppID - lconf.ListenOnAddress = Server.TCPAddr - log.Info("jsonconf", "配置文件载入成功%v", Server) -} diff --git a/servers/center/main.go b/servers/center/main.go index db9568b..f1b382a 100644 --- a/servers/center/main.go +++ b/servers/center/main.go @@ -3,9 +3,8 @@ package main import ( "xlddz/core" _ "xlddz/servers/center/business" - _ "xlddz/servers/center/conf" ) func main() { - core.Start() + core.Start("center") } diff --git a/servers/config/business/business.go b/servers/config/business/business.go index 831c0b8..2091dc4 100644 --- a/servers/config/business/business.go +++ b/servers/config/business/business.go @@ -44,7 +44,7 @@ func loadConfigs() { //是否使用Apollo if conf.Server.UseApollo { c := &conf.Server.Config - listenerConf := conf.ApolloConfig{ServerType: lconf.AppType, ServerId: conf.Server.AppID} + listenerConf := conf.ApolloConfig{ServerType: lconf.AppInfo.AppType, ServerId: conf.Server.AppID} listener := newListener(c, listenerConf) agollo.SetLogger(&DefaultLogger{}) client, _ := agollo.StartWithConfig(func() (*aConfig.AppConfig, error) { @@ -368,7 +368,7 @@ func (d *DefaultLogger) Error(v ...interface{}) { } func handleApolloCfgReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*config.ApolloCfgReq) log.Debug("配置", "收到配置请求,AppType=%v,AppId=%v,KeyName=%v,SubAppType=%v,SubAppId=%v,Subscribe=%v", diff --git a/servers/config/common-server-center.json b/servers/config/common-server-center.json index b4fb095..1c44cfc 100644 --- a/servers/config/common-server-center.json +++ b/servers/config/common-server-center.json @@ -1 +1,12 @@ -{"appId":"common-server","cluster":"default","namespaceName":"router","releaseKey":"20211125144134-b9ac8ee6517284e1","configurations":{"LogScreenPrint":"1","title":"路由服务","日志服务器地址":"127.0.0.1:20001","服务维护":"{\"AppType\":7,\"AppId\":31,\"OpType\":3}"}} +{ + "appId": "common-server", + "cluster": "default", + "namespaceName": "router", + "releaseKey": "20211125144134-b9ac8ee6517284e1", + "configurations": { + "LogScreenPrint": "1", + "title": "路由服务", + "日志服务器地址": "127.0.0.1:20001", + "服务维护": "{\"AppType\":7,\"AppId\":31,\"OpType\":3}" + } +} \ No newline at end of file diff --git a/servers/config/common-server-login.json b/servers/config/common-server-login.json index 829ae65..dc32db0 100644 --- a/servers/config/common-server-login.json +++ b/servers/config/common-server-login.json @@ -1 +1,11 @@ -{"appId":"common-server","cluster":"default","namespaceName":"login","releaseKey":"20211122134543-11cc8ee6517284d7","configurations":{"LogScreenPrint":"1","title":"登录服务","日志服务器地址":"127.0.0.1:20001"}} +{ + "appId": "common-server", + "cluster": "default", + "namespaceName": "login", + "releaseKey": "20211122134543-11cc8ee6517284d7", + "configurations": { + "LogScreenPrint": "1", + "title": "登录服务", + "日志服务器地址": "127.0.0.1:20001" + } +} \ No newline at end of file diff --git a/servers/config/conf/conf.go b/servers/config/conf/conf.go deleted file mode 100644 index 6bf0214..0000000 --- a/servers/config/conf/conf.go +++ /dev/null @@ -1,24 +0,0 @@ -package conf - -import ( - "log" - "time" -) - -var ( - // log conf - LogFlag = log.LstdFlags - - // gate conf - PendingWriteNum = 2000 - MaxMsgLen uint32 = 4096 - HTTPTimeout = 10 * time.Second - LenMsgLen = 2 - LittleEndian = false - - // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 -) diff --git a/servers/config/conf/config.json b/servers/config/conf/config.json index 8c64c05..1809b3f 100644 --- a/servers/config/conf/config.json +++ b/servers/config/conf/config.json @@ -1,8 +1,9 @@ { "AppName": "config", + "AppType": 2, "AppID": 1, - "TCPClientAddr": "127.0.0.1:10001", - "TCPAddr": "0.0.0.0:10011", + "CenterAddr": "127.0.0.1:10001", + "ListenOnAddress": "0.0.0.0:10011", "MaxConnNum": 20000, "ConsolePort": 0, "UseApollo": false, diff --git a/servers/config/conf/json.go b/servers/config/conf/json.go index 59ffd78..238f074 100644 --- a/servers/config/conf/json.go +++ b/servers/config/conf/json.go @@ -3,23 +3,21 @@ package conf import ( "encoding/json" "io/ioutil" - lconf "xlddz/core/conf" "xlddz/core/log" - n "xlddz/core/network" aConfig "xlddz/servers/config/agollo/env/config" ) var Server struct { - TCPClientAddr string - TCPAddr string - AppID uint32 - AppName string - MaxConnNum int - ConsolePort int - ScreenPrint bool - UseApollo bool `default:"false" json:"UseApollo"` - Config aConfig.AppConfig - CommonServers []ApolloConfig + CenterAddr string + ListenOnAddress string + AppID uint32 + AppName string + MaxConnNum int + ConsolePort int + ScreenPrint bool + UseApollo bool `default:"false" json:"UseApollo"` + Config aConfig.AppConfig + CommonServers []ApolloConfig } type ApolloConfig struct { @@ -41,11 +39,5 @@ func init() { log.Fatal("jsonconf", "%v", err) } - lconf.ConsolePort = Server.ConsolePort - lconf.AppName = Server.AppName - lconf.AppID = Server.AppID - lconf.AppType = n.AppConfig - lconf.ListenOnAddress = Server.TCPAddr - lconf.CenterAddr = Server.TCPClientAddr log.Info("jsonconf", "配置文件载入成功%v", Server) } diff --git a/servers/config/main.go b/servers/config/main.go index 237f207..52e9a65 100644 --- a/servers/config/main.go +++ b/servers/config/main.go @@ -7,5 +7,5 @@ import ( ) func main() { - core.Start() + core.Start("config") } diff --git a/servers/config/xlddz-apollo-application.json b/servers/config/xlddz-apollo-application.json index 2e75948..cebdb74 100644 --- a/servers/config/xlddz-apollo-application.json +++ b/servers/config/xlddz-apollo-application.json @@ -1 +1,10 @@ -{"appId":"xlddz-apollo","cluster":"default","namespaceName":"application","releaseKey":"20211123145121-02048ee6517284da","configurations":{"common-server":"[{\"appid\":\"common-server\",\"cluster\":\"dev\",\"namespaceName\":\"router\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":6,\"serverid\":0},{\"appid\":\"common-server\",\"cluster\":\"dev\",\"namespaceName\":\"login\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":7,\"serverid\":0},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10100\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":100},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10101\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":101},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10102\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":102}]","日志服务器地址":"127.0.0.1:20001"}} +{ + "appId": "xlddz-apollo", + "cluster": "default", + "namespaceName": "application", + "releaseKey": "20211123145121-02048ee6517284da", + "configurations": { + "common-server": "[{\"appid\":\"common-server\",\"cluster\":\"dev\",\"namespaceName\":\"router\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":6,\"serverid\":0},{\"appid\":\"common-server\",\"cluster\":\"dev\",\"namespaceName\":\"login\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":7,\"serverid\":0},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10100\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":100},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10101\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":101},{\"appid\":\"xlddz-gateway\",\"cluster\":\"10102\",\"namespaceName\":\"ddz.gateway\",\"ip\":\"http://192.168.22.47:8080\",\"servertype\":5,\"serverid\":102}]", + "日志服务器地址": "127.0.0.1:20001" + } +} \ No newline at end of file diff --git a/servers/config/xlddz-gateway-ddz.gateway-100.json b/servers/config/xlddz-gateway-ddz.gateway-100.json index f8a4dab..de7522a 100644 --- a/servers/config/xlddz-gateway-ddz.gateway-100.json +++ b/servers/config/xlddz-gateway-ddz.gateway-100.json @@ -1 +1,18 @@ -{"appId":"xlddz-gateway","cluster":"10100","namespaceName":"ddz.gateway","releaseKey":"20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb","configurations":{"LogScreenPrint":"1","appid":"100","router地址":"","title":"网关0","内存限制":"0","开始监控连接数量":"5000","心跳间隔":"600","日志服务器地址":"127.0.0.1:20001","监听地址":"","监控间隔":"1000"}} \ No newline at end of file +{ + "appId": "xlddz-gateway", + "cluster": "10100", + "namespaceName": "ddz.gateway", + "releaseKey": "20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb", + "configurations": { + "LogScreenPrint": "1", + "appid": "100", + "router地址": "", + "title": "网关0", + "内存限制": "0", + "开始监控连接数量": "5000", + "心跳间隔": "600", + "日志服务器地址": "127.0.0.1:20001", + "监听地址": "", + "监控间隔": "1000" + } +} \ No newline at end of file diff --git a/servers/config/xlddz-gateway-ddz.gateway-101.json b/servers/config/xlddz-gateway-ddz.gateway-101.json index d3633dc..d848945 100644 --- a/servers/config/xlddz-gateway-ddz.gateway-101.json +++ b/servers/config/xlddz-gateway-ddz.gateway-101.json @@ -1 +1,18 @@ -{"appId":"xlddz-gateway","cluster":"10101","namespaceName":"ddz.gateway","releaseKey":"20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb","configurations":{"LogScreenPrint":"1","appid":"101","router地址":"","title":"网关1","内存限制":"0","开始监控连接数量":"5000","心跳间隔":"600","日志服务器地址":"127.0.0.1:20001","监听地址":"","监控间隔":"1000"}} \ No newline at end of file +{ + "appId": "xlddz-gateway", + "cluster": "10101", + "namespaceName": "ddz.gateway", + "releaseKey": "20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb", + "configurations": { + "LogScreenPrint": "1", + "appid": "101", + "router地址": "", + "title": "网关1", + "内存限制": "0", + "开始监控连接数量": "5000", + "心跳间隔": "600", + "日志服务器地址": "127.0.0.1:20001", + "监听地址": "", + "监控间隔": "1000" + } +} \ No newline at end of file diff --git a/servers/config/xlddz-gateway-ddz.gateway-102.json b/servers/config/xlddz-gateway-ddz.gateway-102.json index 68da544..9225f48 100644 --- a/servers/config/xlddz-gateway-ddz.gateway-102.json +++ b/servers/config/xlddz-gateway-ddz.gateway-102.json @@ -1 +1,18 @@ -{"appId":"xlddz-gateway","cluster":"10102","namespaceName":"ddz.gateway","releaseKey":"20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb","configurations":{"LogScreenPrint":"1","appid":"102","router地址":"","title":"网关2","内存限制":"0","开始监控连接数量":"5000","心跳间隔":"600","日志服务器地址":"127.0.0.1:20001","监听地址":"","监控间隔":"1000"}} \ No newline at end of file +{ + "appId": "xlddz-gateway", + "cluster": "10102", + "namespaceName": "ddz.gateway", + "releaseKey": "20211119184737-6c788ee6517284d3+20211119180205-a4528ee6517284cb", + "configurations": { + "LogScreenPrint": "1", + "appid": "102", + "router地址": "", + "title": "网关2", + "内存限制": "0", + "开始监控连接数量": "5000", + "心跳间隔": "600", + "日志服务器地址": "127.0.0.1:20001", + "监听地址": "", + "监控间隔": "1000" + } +} \ No newline at end of file diff --git a/servers/gateway/business/business.go b/servers/gateway/business/business.go index d54987d..a64bf9c 100644 --- a/servers/gateway/business/business.go +++ b/servers/gateway/business/business.go @@ -4,13 +4,13 @@ import ( "errors" "github.com/golang/protobuf/proto" "time" + "xlddz/core/conf" "xlddz/core/conf/apollo" g "xlddz/core/gate" "xlddz/core/log" n "xlddz/core/network" "xlddz/protocol/client" gcmd "xlddz/protocol/gate" - "xlddz/servers/gateway/conf" ) var ( @@ -33,7 +33,7 @@ func init() { g.MsgRegister(&gcmd.HelloReq{}, n.CMDGate, uint16(gcmd.CMDID_Gate_IDHelloReq), handleHelloReq) g.EventRegister(g.ConnectSuccess, connectSuccess) g.EventRegister(g.Disconnect, disconnect) - g.EventRegister(g.CenterConnected, routerConnected) + g.EventRegister(g.CenterConnected, centerConnected) g.Skeleton.AfterFunc(30*time.Second, checkConnectionAlive) } @@ -72,12 +72,12 @@ func disconnect(args []interface{}) { } } -func routerConnected(args []interface{}) { +func centerConnected(args []interface{}) { } func handlePulseReq(args []interface{}) { - //m := args[n.DATA_INDEX].(*gcmd.PulseReq) - a := args[n.AGENT_INDEX].(n.AgentClient) + //m := args[n.DataIndex].(*gcmd.PulseReq) + a := args[n.AgentIndex].(n.AgentClient) connData, err := getUserConnData(a) if err != nil { @@ -91,9 +91,9 @@ func handlePulseReq(args []interface{}) { } func handleTransferDataReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*gcmd.TransferDataReq) - a := args[n.AGENT_INDEX].(n.AgentClient) + a := args[n.AgentIndex].(n.AgentClient) connData, err := getUserConnData(a) if err != nil { @@ -115,7 +115,7 @@ func handleTransferDataReq(args []interface{}) { } a.SendData(n.CMDGate, uint32(gcmd.CMDID_Gate_IDTransferDataReq), m) } else { - m.Gateid = proto.Uint32(conf.Server.AppID) + m.Gateid = proto.Uint32(conf.AppInfo.AppID) m.Gateconnid = proto.Uint64(makeGateConnID(connData.connId)) g.SendData2App(m.GetAttApptype(), m.GetAttAppid(), n.CMDGate, uint32(gcmd.CMDID_Gate_IDTransferDataReq), m) } @@ -127,9 +127,9 @@ func handleAuthInfo(args []interface{}) { } func handleHelloReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*gcmd.HelloReq) - a := args[n.AGENT_INDEX].(n.AgentClient) + a := args[n.AgentIndex].(n.AgentClient) connData, err := getUserConnData(a) if err != nil { @@ -152,7 +152,7 @@ func handleHelloReq(args []interface{}) { } func makeGateConnID(connId uint64) uint64 { - return uint64(conf.Server.AppID)<<32 + connId + return uint64(conf.AppInfo.AppID)<<32 + connId } func getUserConnData(a n.AgentClient) (*connectionData, error) { diff --git a/servers/gateway/conf/conf.go b/servers/gateway/conf/conf.go deleted file mode 100644 index 6bf0214..0000000 --- a/servers/gateway/conf/conf.go +++ /dev/null @@ -1,24 +0,0 @@ -package conf - -import ( - "log" - "time" -) - -var ( - // log conf - LogFlag = log.LstdFlags - - // gate conf - PendingWriteNum = 2000 - MaxMsgLen uint32 = 4096 - HTTPTimeout = 10 * time.Second - LenMsgLen = 2 - LittleEndian = false - - // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 -) diff --git a/servers/gateway/conf/gateway.json b/servers/gateway/conf/gateway.json index 05e83b7..25399e9 100644 --- a/servers/gateway/conf/gateway.json +++ b/servers/gateway/conf/gateway.json @@ -1,6 +1,7 @@ { - "AppName": "gateway", - "AppID": 102, - "TCPAddr": "0.0.0.0:10102", - "TCPClientAddr": "127.0.0.1:10001" + "AppName": "gateway", + "AppType": 5, + "AppID": 102, + "ListenOnAddress": "0.0.0.0:10102", + "CenterAddr": "127.0.0.1:10001" } \ No newline at end of file diff --git a/servers/gateway/conf/json.go b/servers/gateway/conf/json.go deleted file mode 100644 index 8c03479..0000000 --- a/servers/gateway/conf/json.go +++ /dev/null @@ -1,34 +0,0 @@ -package conf - -import ( - "encoding/json" - "io/ioutil" - lconf "xlddz/core/conf" - "xlddz/core/log" - n "xlddz/core/network" -) - -var Server struct { - AppName string - AppID uint32 - TCPAddr string - TCPClientAddr string -} - -func init() { - data, err := ioutil.ReadFile("conf/gateway.json") - if err != nil { - log.Fatal("jsonconf", "QQQ%v", err) - } - err = json.Unmarshal(data, &Server) - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - - lconf.AppName = Server.AppName - lconf.AppType = n.AppGate - lconf.AppID = Server.AppID - lconf.ListenOnAddress = Server.TCPAddr - lconf.CenterAddr = Server.TCPClientAddr - log.Info("jsonconf", "配置文件载入成功%v", Server) -} diff --git a/servers/gateway/main.go b/servers/gateway/main.go index e6aa151..31e71c1 100644 --- a/servers/gateway/main.go +++ b/servers/gateway/main.go @@ -6,7 +6,6 @@ import ( "xlddz/core" "xlddz/core/log" _ "xlddz/servers/gateway/business" - _ "xlddz/servers/gateway/conf" ) func main() { @@ -18,6 +17,6 @@ func main() { os.Exit(0) }() - core.Start() + core.Start("gateway") } diff --git a/servers/logger/business/business.go b/servers/logger/business/business.go index 8b18000..8e32133 100644 --- a/servers/logger/business/business.go +++ b/servers/logger/business/business.go @@ -76,10 +76,10 @@ func disconnect(args []interface{}) { } func handleLogReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*logger.LogReq) - //m := args[n.DATA_INDEX].(*logger.LogReq) - a := args[n.AGENT_INDEX].(n.AgentClient) + //m := args[n.DataIndex].(*logger.LogReq) + a := args[n.AgentIndex].(n.AgentClient) //连接存在判断 if _, ok := appConnData[a]; !ok { diff --git a/servers/logger/conf/conf.go b/servers/logger/conf/conf.go deleted file mode 100644 index 6bf0214..0000000 --- a/servers/logger/conf/conf.go +++ /dev/null @@ -1,24 +0,0 @@ -package conf - -import ( - "log" - "time" -) - -var ( - // log conf - LogFlag = log.LstdFlags - - // gate conf - PendingWriteNum = 2000 - MaxMsgLen uint32 = 4096 - HTTPTimeout = 10 * time.Second - LenMsgLen = 2 - LittleEndian = false - - // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 -) diff --git a/servers/logger/conf/json.go b/servers/logger/conf/json.go deleted file mode 100644 index fded967..0000000 --- a/servers/logger/conf/json.go +++ /dev/null @@ -1,32 +0,0 @@ -package conf - -import ( - "encoding/json" - "io/ioutil" - lconf "xlddz/core/conf" - "xlddz/core/log" - n "xlddz/core/network" -) - -var Server struct { - AppName string - AppID uint32 - TCPAddr string -} - -func init() { - data, err := ioutil.ReadFile("conf/logger.json") - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - err = json.Unmarshal(data, &Server) - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - - lconf.AppName = Server.AppName - lconf.AppType = n.AppLogger - lconf.AppID = Server.AppID - lconf.ListenOnAddress = Server.TCPAddr - log.Info("jsonconf", "配置文件载入成功%v", Server) -} diff --git a/servers/logger/conf/logger.json b/servers/logger/conf/logger.json index a7768bb..2aad390 100644 --- a/servers/logger/conf/logger.json +++ b/servers/logger/conf/logger.json @@ -1,5 +1,7 @@ { "AppName": "logger", + "AppType": 8, "AppID": 0, - "TCPAddr": "0.0.0.0:20001" + "ListenOnAddress": "0.0.0.0:20001", + "CenterAddr": "" } \ No newline at end of file diff --git a/servers/logger/main.go b/servers/logger/main.go index 7de84ad..287ba91 100644 --- a/servers/logger/main.go +++ b/servers/logger/main.go @@ -3,9 +3,8 @@ package main import ( "xlddz/core" _ "xlddz/servers/logger/business" - _ "xlddz/servers/logger/conf" ) func main() { - core.Start() + core.Start("logger") } diff --git a/servers/login/business/business.go b/servers/login/business/business.go index 2a84e6c..a292892 100644 --- a/servers/login/business/business.go +++ b/servers/login/business/business.go @@ -26,10 +26,10 @@ func disconnect(args []interface{}) { } func handleLoginReq(args []interface{}) { - //a := args[n.AGENT_INDEX].(n.AgentClient) - b := args[n.DATA_INDEX].(n.BaseMessage) + //a := args[n.AgentIndex].(n.AgentClient) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*client.LoginReq) - srcData := args[n.OTHER_INDEX].(*gate.TransferDataReq) + srcData := args[n.OtherIndex].(*gate.TransferDataReq) log.Debug("登录", "收到登录,主渠道=%d,子渠道=%d", m.GetChannelId(), m.GetSiteId()) @@ -37,7 +37,7 @@ func handleLoginReq(args []interface{}) { } func handleLogoutReq(args []interface{}) { - b := args[n.DATA_INDEX].(n.BaseMessage) + b := args[n.DataIndex].(n.BaseMessage) m := (b.MyMessage).(*client.LogoutReq) log.Debug("注销", "注销请求,userId=%v", m.GetUserId()) } diff --git a/servers/login/conf/conf.go b/servers/login/conf/conf.go deleted file mode 100644 index 6bf0214..0000000 --- a/servers/login/conf/conf.go +++ /dev/null @@ -1,24 +0,0 @@ -package conf - -import ( - "log" - "time" -) - -var ( - // log conf - LogFlag = log.LstdFlags - - // gate conf - PendingWriteNum = 2000 - MaxMsgLen uint32 = 4096 - HTTPTimeout = 10 * time.Second - LenMsgLen = 2 - LittleEndian = false - - // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 -) diff --git a/servers/login/conf/json.go b/servers/login/conf/json.go deleted file mode 100644 index 3554cb3..0000000 --- a/servers/login/conf/json.go +++ /dev/null @@ -1,34 +0,0 @@ -package conf - -import ( - "encoding/json" - "io/ioutil" - lconf "xlddz/core/conf" - "xlddz/core/log" -) - -var Server struct { - TCPClientAddr string - TCPAddr string - AppType uint32 - AppID uint32 - AppName string -} - -func init() { - data, err := ioutil.ReadFile("conf/login.json") - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - err = json.Unmarshal(data, &Server) - if err != nil { - log.Fatal("jsonconf", "%v", err) - } - - lconf.AppName = Server.AppName - lconf.AppType = Server.AppType - lconf.AppID = Server.AppID - lconf.ListenOnAddress = Server.TCPAddr - lconf.CenterAddr = Server.TCPClientAddr - log.Info("jsonconf", "配置文件载入成功%v", Server) -} diff --git a/servers/login/conf/login.json b/servers/login/conf/login.json index e652779..c8e2377 100644 --- a/servers/login/conf/login.json +++ b/servers/login/conf/login.json @@ -2,6 +2,6 @@ "AppName": "login", "AppType": 7, "AppID": 30, - "TCPClientAddr": "127.0.0.1:10001", - "TCPAddr": "0.0.0.0:10010" + "CenterAddr": "127.0.0.1:10001", + "ListenOnAddress": "0.0.0.0:10010" } \ No newline at end of file diff --git a/servers/login/main.go b/servers/login/main.go index d4259b2..6b19c2a 100644 --- a/servers/login/main.go +++ b/servers/login/main.go @@ -3,13 +3,8 @@ package main import ( "xlddz/core" _ "xlddz/servers/login/business" - _ "xlddz/servers/login/conf" ) func main() { - core.Start() - //core.Run( - // new(business.Gate), - // new(business.Module), - //) + core.Start("login") }