From a84d512b0161ab8118e2c9c11a0542edc5e977e4 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Sat, 21 Dec 2024 01:20:28 +0900 Subject: [PATCH 01/10] updated max supported eps --- pkg/loxinet/dpebpf_linux.go | 2 +- pkg/loxinet/rules.go | 21 ++++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index ad064b861..774feacc1 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -1060,7 +1060,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int { nxfa.inactive = 1 } - dat.nxfrm = C.uchar(len(w.endPoints)) + dat.nxfrm = C.ushort(len(w.endPoints)) if w.CsumDis { dat.cdis = 1 } else { diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index aa08c735a..54cc0690e 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,7 +78,8 @@ const ( // constants const ( - MaxLBEndPoints = 32 + MaxLBEndPoints = 256 + MaxLBEndPointsRR = 32 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off DflLbaCheckTimeout = 10 // Default timeout for checking LB arms @@ -1554,6 +1555,12 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range error") } + if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections || + serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) && + len(servEndPoints) > MaxLBEndPointsRR { + return RuleEpCountErr, errors.New("endpoints-range1 error") + } + // For ICMP service, non-zero port can't be specified if serv.Proto == "icmp" && serv.ServPort != 0 { return RuleUnknownServiceErr, errors.New("malformed-service error") @@ -2854,19 +2861,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { if at.sel == cmn.LbSelPrio { j := 0 k := 0 - var small [MaxLBEndPoints]int - var neps [MaxLBEndPoints]ruleLBEp + var small [MaxLBEndPointsRR]int + var neps [MaxLBEndPointsRR]ruleLBEp for i, ep := range at.endPoints { if ep.inActiveEP { continue } oEp := &at.endPoints[i] - sw := (int(ep.weight) * MaxLBEndPoints) / 100 + sw := (int(ep.weight) * MaxLBEndPointsRR) / 100 if sw == 0 { small[k] = i k++ } - for x := 0; x < sw && j < MaxLBEndPoints; x++ { + for x := 0; x < sw && j < MaxLBEndPointsRR; x++ { neps[j].xIP = oEp.xIP neps[j].rIP = oEp.rIP neps[j].xPort = oEp.xPort @@ -2879,12 +2886,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { j++ } } - if j < MaxLBEndPoints { + if j < MaxLBEndPointsRR { v := 0 if k == 0 { k = len(at.endPoints) } - for j < MaxLBEndPoints { + for j < MaxLBEndPointsRR { idx := small[v%k] oEp := &at.endPoints[idx] neps[j].xIP = oEp.xIP From 6cd63c935b648ecdff0e2e345e81c6a9ab03a97a Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Sat, 21 Dec 2024 22:56:45 +0900 Subject: [PATCH 02/10] updated max supported eps --- pkg/loxinet/rules.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 54cc0690e..7a5827d6c 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,7 +78,7 @@ const ( // constants const ( - MaxLBEndPoints = 256 + MaxLBEndPoints = 1600 MaxLBEndPointsRR = 32 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off @@ -91,7 +91,7 @@ const ( LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health EndPointCheckerDuration = 2 // Duration at which ep-helpers will run - MaxEndPointSweeps = 20 // Maximum end-point sweeps per round + MaxEndPointSweeps = 40 // Maximum end-point sweeps per round VIPSweepDuration = 30 // Duration of periodic VIP maintenance DefaultPersistTimeOut = 10800 // Default persistent LB session timeout SnatFwMark = 0x80000000 // Snat Marker @@ -905,7 +905,8 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv pType = HostProbeConnectTCP pPort = nep.xPort } else if r.tuples.l4Prot.val == 17 { - pType = HostProbeConnectUDP + //pType = HostProbeConnectUDP + pType = HostProbeConnectTCP // FIXME pPort = nep.xPort } else if r.tuples.l4Prot.val == 1 { pType = HostProbePing @@ -1250,7 +1251,8 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool { if rule.tuples.l4Prot.val == 6 { sType = HostProbeConnectTCP } else if rule.tuples.l4Prot.val == 17 { - sType = HostProbeConnectUDP + //sType = HostProbeConnectUDP + sType = HostProbeConnectTCP // FIXME } else if rule.tuples.l4Prot.val == 1 { sType = HostProbePing } else if rule.tuples.l4Prot.val == 132 { From 59fd05f5651ce8f4d1b98a25afd10cc95733010b Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Tue, 24 Dec 2024 01:07:09 +0900 Subject: [PATCH 03/10] fixes to hostonearm --- pkg/loxinet/rules.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 7a5827d6c..c270ae05d 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -1207,7 +1207,7 @@ func (R *RuleH) mkHostAssocs(r *ruleEnt) bool { } for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && !ipnet.IP.IsUnspecified() { // check if IPv4 or IPv6 is not nil if ipnet.IP.To4() != nil || ipnet.IP.To16() != nil { if tk.IsNetIPv4(ipnet.IP.String()) && r.tuples.l3Dst.addr.IP.String() != ipnet.IP.String() { @@ -2952,8 +2952,10 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { return -1 } - mh.dp.ToDpCh <- nWork - r.VIP2DP(nWork.Work) + if !nWork.ServiceIP.IsUnspecified() { + mh.dp.ToDpCh <- nWork + r.VIP2DP(nWork.Work) + } if mode == cmn.LBModeHostOneArm { for locIP := range r.locIPs { From 8c1e9dccb261a37c57ffaff3b1d9f2e598ef9014 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Thu, 26 Dec 2024 12:23:59 +0900 Subject: [PATCH 04/10] updates for submodule --- pkg/loxinet/rules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index c270ae05d..67c163381 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,7 +78,7 @@ const ( // constants const ( - MaxLBEndPoints = 1600 + MaxLBEndPoints = 1500 MaxLBEndPointsRR = 32 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off From 04b6afb63c423ae9a0286fb01776316d57a64691 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Mon, 30 Dec 2024 21:37:54 +0900 Subject: [PATCH 05/10] improved persist --- pkg/loxinet/dpbroker.go | 2 ++ pkg/loxinet/dpebpf_linux.go | 23 ++++++++++++++++++----- pkg/loxinet/loxinet.go | 2 +- pkg/loxinet/xsync_server.go | 5 +++++ 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/loxinet/dpbroker.go b/pkg/loxinet/dpbroker.go index e2b54f131..110227022 100644 --- a/pkg/loxinet/dpbroker.go +++ b/pkg/loxinet/dpbroker.go @@ -513,6 +513,8 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { var ret int var err error + return 0 + dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 774feacc1..89790354c 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -268,14 +268,27 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) { } // DpEbpfInit - initialize the ebpf dp subsystem -func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { +func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { var cfg C.struct_ebpfcfg - if clusterEn { - cfg.have_mtrace = 1 - } else { - cfg.have_mtrace = 0 + cNodes := strings.Split(clusterNodes, ",") + for i, cNode := range cNodes { + addr := net.ParseIP(cNode) + if addr == nil { + continue + } + if i == 0 { + cfg.cluster1 = C.CString(cNode) + } else if i == 1 { + cfg.cluster2 = C.CString(cNode) + } } + + //if clusterEn { + // cfg.have_mtrace = 1 + //} else { + // cfg.have_mtrace = 0 + //} if egrHooks { cfg.egr_hooks = 1 } else { diff --git a/pkg/loxinet/loxinet.go b/pkg/loxinet/loxinet.go index 779e2a536..ec45e07f8 100644 --- a/pkg/loxinet/loxinet.go +++ b/pkg/loxinet/loxinet.go @@ -285,7 +285,7 @@ func loxiNetInit() { RunCommand(MkMountCG2, false) } // Initialize the ebpf datapath subsystem - mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) + mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode) // Initialize the security zone subsystem diff --git a/pkg/loxinet/xsync_server.go b/pkg/loxinet/xsync_server.go index 02fca6508..d4e9ddee7 100644 --- a/pkg/loxinet/xsync_server.go +++ b/pkg/loxinet/xsync_server.go @@ -26,6 +26,7 @@ import ( "net/rpc" "os" "runtime/debug" + "time" opts "github.com/loxilb-io/loxilb/options" tk "github.com/loxilb-io/loxilib" @@ -213,6 +214,10 @@ func LoxiXsyncMain(mode string) { return } + for { + time.Sleep(1 * time.Second) + } + // Stack trace logger defer func() { if e := recover(); e != nil { From 5f221e4b14de17c69bcb54fd9a3325a26a815c0a Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Mon, 30 Dec 2024 23:35:00 +0900 Subject: [PATCH 06/10] improved persist --- pkg/loxinet/rules.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 67c163381..54b92b11b 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -1563,6 +1563,13 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range1 error") } + // Validate persist timeout + if serv.Sel == cmn.LbSelRrPersist { + if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { + serv.PersistTimeout = DefaultPersistTimeOut + } + } + // For ICMP service, non-zero port can't be specified if serv.Proto == "icmp" && serv.ServPort != 0 { return RuleUnknownServiceErr, errors.New("malformed-service error") @@ -1834,14 +1841,8 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al r.bgp = serv.Bgp r.ci = cmn.CIDefault r.privIP = privIP - r.pTO = 0 - if serv.Sel == cmn.LbSelRrPersist { - if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { - r.pTO = DefaultPersistTimeOut - } else { - r.pTO = serv.PersistTimeout - } - } + r.pTO = serv.PersistTimeout + r.locIPs = make(map[string]struct{}) if !serv.Snat { From bc71e9fd7647a32c94175e6109f9879a3779e29b Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Tue, 31 Dec 2024 00:41:12 +0900 Subject: [PATCH 07/10] chore: fix persist timeout args --- pkg/loxinet/rules.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 17d8cacb3..db298bb68 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -1571,6 +1571,13 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range error") } + // Validate persist timeout + if serv.Sel == cmn.LbSelRrPersist { + if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { + serv.PersistTimeout = DefaultPersistTimeOut + } + } + // For ICMP service, non-zero port can't be specified if serv.Proto == "icmp" && serv.ServPort != 0 { return RuleUnknownServiceErr, errors.New("malformed-service error") @@ -1847,14 +1854,7 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al r.bgp = serv.Bgp r.ci = cmn.CIDefault r.privIP = privIP - r.pTO = 0 - if serv.Sel == cmn.LbSelRrPersist { - if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { - r.pTO = DefaultPersistTimeOut - } else { - r.pTO = serv.PersistTimeout - } - } + r.pTO = serv.PersistTimeout r.locIPs = make(map[string]struct{}) if !serv.Snat { From 4fcf930144d6d5d85c9cca654f02657eac5653c9 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Tue, 31 Dec 2024 14:08:55 +0900 Subject: [PATCH 08/10] Add checks for cluster node address --- pkg/loxinet/dpebpf_linux.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 89790354c..87b0dbfb4 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -277,6 +277,9 @@ func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn if addr == nil { continue } + if utils.IsIPHostAddr(cNode) { + continue + } if i == 0 { cfg.cluster1 = C.CString(cNode) } else if i == 1 { From e289ba60076d5e6cfbe30aa4ac03b8132814c67e Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Wed, 1 Jan 2025 11:57:57 +0900 Subject: [PATCH 09/10] support modification of cnodes --- pkg/loxinet/dpbroker.go | 52 ++++++++++++++++++------------ pkg/loxinet/dpebpf_linux.go | 64 +++++++++++++++++++++++++++---------- 2 files changed, 80 insertions(+), 36 deletions(-) diff --git a/pkg/loxinet/dpbroker.go b/pkg/loxinet/dpbroker.go index 110227022..f4b7c6a0e 100644 --- a/pkg/loxinet/dpbroker.go +++ b/pkg/loxinet/dpbroker.go @@ -18,15 +18,13 @@ package loxinet import ( "fmt" + cmn "github.com/loxilb-io/loxilb/common" + tk "github.com/loxilb-io/loxilib" "net" "os" "runtime/debug" "sync" "time" - - tk "github.com/loxilb-io/loxilib" - - cmn "github.com/loxilb-io/loxilb/common" ) // man names constants @@ -43,6 +41,10 @@ const ( MapNameFw4 = "FW4" ) +const ( + UseRPCPeer = false +) + // error codes const ( DpErrBase = iota - 103000 @@ -448,6 +450,8 @@ type DpHookInterface interface { DpCtDel(w *DpCtInfo) int DpSockVIPAdd(w *SockVIPDpWorkQ) int DpSockVIPDel(w *SockVIPDpWorkQ) int + DpCnodeAdd(w *PeerDpWorkQ) int + DpCnodeDel(w *PeerDpWorkQ) int DpTableGC() DpCtGetAsync() DpGetLock() @@ -770,27 +774,35 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT { // DpWorkOnPeerOp - routine to work on a peer request for clustering func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT { if pWq.Work == DpCreate { - var newPeer DpPeer - for _, pe := range dp.Peers { - if pe.Peer.Equal(pWq.PeerIP) { - return DpCreateErr + if UseRPCPeer { + var newPeer DpPeer + for _, pe := range dp.Peers { + if pe.Peer.Equal(pWq.PeerIP) { + return DpCreateErr + } } + newPeer.Peer = pWq.PeerIP + dp.Peers = append(dp.Peers, newPeer) + tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) + return 0 + } else { + return dp.DpHooks.DpCnodeAdd(pWq) } - newPeer.Peer = pWq.PeerIP - dp.Peers = append(dp.Peers, newPeer) - tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) - return 0 } else if pWq.Work == DpRemove { - for idx := range dp.Peers { - pe := &dp.Peers[idx] - if pe.Peer.Equal(pWq.PeerIP) { - if pe.Client != nil { - dp.RPC.RPCHooks.RPCClose(pe) + if UseRPCPeer { + for idx := range dp.Peers { + pe := &dp.Peers[idx] + if pe.Peer.Equal(pWq.PeerIP) { + if pe.Client != nil { + dp.RPC.RPCHooks.RPCClose(pe) + } + dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) + tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) + return 0 } - dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) - tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) - return 0 } + } else { + return dp.DpHooks.DpCnodeDel(pWq) } } diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 87b0dbfb4..f3a5d2ac0 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -271,23 +271,23 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) { func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { var cfg C.struct_ebpfcfg - cNodes := strings.Split(clusterNodes, ",") - for i, cNode := range cNodes { - addr := net.ParseIP(cNode) - if addr == nil { - continue - } - if utils.IsIPHostAddr(cNode) { - continue - } - if i == 0 { - cfg.cluster1 = C.CString(cNode) - } else if i == 1 { - cfg.cluster2 = C.CString(cNode) - } - } + //cNodes := strings.Split(clusterNodes, ",") + //for i, cNode := range cNodes { + // addr := net.ParseIP(cNode) + // if addr == nil { + // continue + // } + // if utils.IsIPHostAddr(cNode) { + // continue + // } + // if i == 0 { + // cfg.cluster1 = C.CString(cNode) + // } else if i == 1 { + // cfg.cluster2 = C.CString(cNode) + // } + //} - //if clusterEn { + //if len(clusterEn) > 0 { // cfg.have_mtrace = 1 //} else { // cfg.have_mtrace = 0 @@ -1939,6 +1939,38 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int { return ec } +// DpCnodeAdd - routine to work on adding a cnode +func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int { + cnode := w.PeerIP.String() + + cnodeStr := C.CString(cnode) + defer C.free(unsafe.Pointer(cnodeStr)) + + ec := int(C.llb_add_cnode(cnodeStr)) + if ec != 0 { + *w.Status = DpCreateErr + } else { + *w.Status = 0 + } + return ec +} + +// DpCnodeDel - routine to work on deleting a cnode +func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int { + cnode := w.PeerIP.String() + + cnodeStr := C.CString(cnode) + defer C.free(unsafe.Pointer(cnodeStr)) + + ec := int(C.llb_delete_cnode(cnodeStr)) + if ec != 0 { + *w.Status = DpRemoveErr + } else { + *w.Status = 0 + } + return ec +} + //export goMapNotiHandler func goMapNotiHandler(m *mapNoti) { From 94b72ec7ecabbcd95ec9e16ee189425a92ef370b Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Wed, 1 Jan 2025 12:33:06 +0900 Subject: [PATCH 10/10] chore:Merge fixes --- pkg/loxinet/dpbroker.go | 53 ++++++++++++------------------- pkg/loxinet/dpebpf_linux.go | 62 +++++-------------------------------- pkg/loxinet/loxinet.go | 2 +- pkg/loxinet/rules.go | 28 ++++++----------- pkg/loxinet/xsync_server.go | 6 ---- 5 files changed, 37 insertions(+), 114 deletions(-) diff --git a/pkg/loxinet/dpbroker.go b/pkg/loxinet/dpbroker.go index 3865c58dd..3faed762c 100644 --- a/pkg/loxinet/dpbroker.go +++ b/pkg/loxinet/dpbroker.go @@ -18,13 +18,14 @@ package loxinet import ( "fmt" - cmn "github.com/loxilb-io/loxilb/common" - tk "github.com/loxilb-io/loxilib" "net" "os" "runtime/debug" "sync" "time" + + cmn "github.com/loxilb-io/loxilb/common" + tk "github.com/loxilb-io/loxilib" ) // man names constants @@ -41,10 +42,6 @@ const ( MapNameFw4 = "FW4" ) -const ( - UseRPCPeer = false -) - // error codes const ( DpErrBase = iota - 103000 @@ -451,8 +448,6 @@ type DpHookInterface interface { DpCtDel(w *DpCtInfo) int DpSockVIPAdd(w *SockVIPDpWorkQ) int DpSockVIPDel(w *SockVIPDpWorkQ) int - DpCnodeAdd(w *PeerDpWorkQ) int - DpCnodeDel(w *PeerDpWorkQ) int DpTableGC() DpCtGetAsync() DpGetLock() @@ -518,8 +513,6 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { var ret int var err error - return 0 - dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() @@ -775,35 +768,27 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT { // DpWorkOnPeerOp - routine to work on a peer request for clustering func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT { if pWq.Work == DpCreate { - if UseRPCPeer { - var newPeer DpPeer - for _, pe := range dp.Peers { - if pe.Peer.Equal(pWq.PeerIP) { - return DpCreateErr - } + var newPeer DpPeer + for _, pe := range dp.Peers { + if pe.Peer.Equal(pWq.PeerIP) { + return DpCreateErr } - newPeer.Peer = pWq.PeerIP - dp.Peers = append(dp.Peers, newPeer) - tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) - return 0 - } else { - return dp.DpHooks.DpCnodeAdd(pWq) } + newPeer.Peer = pWq.PeerIP + dp.Peers = append(dp.Peers, newPeer) + tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String()) + return 0 } else if pWq.Work == DpRemove { - if UseRPCPeer { - for idx := range dp.Peers { - pe := &dp.Peers[idx] - if pe.Peer.Equal(pWq.PeerIP) { - if pe.Client != nil { - dp.RPC.RPCHooks.RPCClose(pe) - } - dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) - tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) - return 0 + for idx := range dp.Peers { + pe := &dp.Peers[idx] + if pe.Peer.Equal(pWq.PeerIP) { + if pe.Client != nil { + dp.RPC.RPCHooks.RPCClose(pe) } + dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) + tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) + return 0 } - } else { - return dp.DpHooks.DpCnodeDel(pWq) } } diff --git a/pkg/loxinet/dpebpf_linux.go b/pkg/loxinet/dpebpf_linux.go index 3ac62691c..40e82308a 100644 --- a/pkg/loxinet/dpebpf_linux.go +++ b/pkg/loxinet/dpebpf_linux.go @@ -268,30 +268,14 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) { } // DpEbpfInit - initialize the ebpf dp subsystem -func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { +func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH { var cfg C.struct_ebpfcfg - //cNodes := strings.Split(clusterNodes, ",") - //for i, cNode := range cNodes { - // addr := net.ParseIP(cNode) - // if addr == nil { - // continue - // } - // if utils.IsIPHostAddr(cNode) { - // continue - // } - // if i == 0 { - // cfg.cluster1 = C.CString(cNode) - // } else if i == 1 { - // cfg.cluster2 = C.CString(cNode) - // } - //} - - //if len(clusterEn) > 0 { - // cfg.have_mtrace = 1 - //} else { - // cfg.have_mtrace = 0 - //} + if clusterEn { + cfg.have_mtrace = 1 + } else { + cfg.have_mtrace = 0 + } if egrHooks { cfg.egr_hooks = 1 } else { @@ -1080,7 +1064,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int { nxfa.inactive = 1 } - dat.nxfrm = C.ushort(len(w.endPoints)) + dat.nxfrm = C.uchar(len(w.endPoints)) if w.CsumDis { dat.cdis = 1 } else { @@ -1943,38 +1927,6 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int { return ec } -// DpCnodeAdd - routine to work on adding a cnode -func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int { - cnode := w.PeerIP.String() - - cnodeStr := C.CString(cnode) - defer C.free(unsafe.Pointer(cnodeStr)) - - ec := int(C.llb_add_cnode(cnodeStr)) - if ec != 0 { - *w.Status = DpCreateErr - } else { - *w.Status = 0 - } - return ec -} - -// DpCnodeDel - routine to work on deleting a cnode -func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int { - cnode := w.PeerIP.String() - - cnodeStr := C.CString(cnode) - defer C.free(unsafe.Pointer(cnodeStr)) - - ec := int(C.llb_delete_cnode(cnodeStr)) - if ec != 0 { - *w.Status = DpRemoveErr - } else { - *w.Status = 0 - } - return ec -} - //export goMapNotiHandler func goMapNotiHandler(m *mapNoti) { diff --git a/pkg/loxinet/loxinet.go b/pkg/loxinet/loxinet.go index 167d67814..5421c30ce 100644 --- a/pkg/loxinet/loxinet.go +++ b/pkg/loxinet/loxinet.go @@ -292,7 +292,7 @@ func loxiNetInit() { RunCommand(MkMountCG2, false) } // Initialize the ebpf datapath subsystem - mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) + mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel) mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode) // Initialize the security zone subsystem diff --git a/pkg/loxinet/rules.go b/pkg/loxinet/rules.go index 52e6bea20..b49058178 100644 --- a/pkg/loxinet/rules.go +++ b/pkg/loxinet/rules.go @@ -78,8 +78,7 @@ const ( // constants const ( - MaxLBEndPoints = 1500 - MaxLBEndPointsRR = 32 + MaxLBEndPoints = 24 DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off DflLbaCheckTimeout = 10 // Default timeout for checking LB arms @@ -91,7 +90,7 @@ const ( LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health EndPointCheckerDuration = 2 // Duration at which ep-helpers will run - MaxEndPointSweeps = 40 // Maximum end-point sweeps per round + MaxEndPointSweeps = 20 // Maximum end-point sweeps per round VIPSweepDuration = 30 // Duration of periodic VIP maintenance DefaultPersistTimeOut = 10800 // Default persistent LB session timeout SnatFwMark = 0x80000000 // Snat Marker @@ -911,7 +910,7 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv pType = HostProbeConnectTCP pPort = nep.xPort } else if r.tuples.l4Prot.val == 17 { - //pType = HostProbeConnectUDP + pType = HostProbeConnectUDP pType = HostProbeConnectTCP // FIXME pPort = nep.xPort } else if r.tuples.l4Prot.val == 1 { @@ -1261,8 +1260,7 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool { if rule.tuples.l4Prot.val == 6 { sType = HostProbeConnectTCP } else if rule.tuples.l4Prot.val == 17 { - //sType = HostProbeConnectUDP - sType = HostProbeConnectTCP // FIXME + sType = HostProbeConnectUDP } else if rule.tuples.l4Prot.val == 1 { sType = HostProbePing } else if rule.tuples.l4Prot.val == 132 { @@ -1574,12 +1572,6 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al return RuleEpCountErr, errors.New("endpoints-range error") } - if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections || - serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) && - len(servEndPoints) > MaxLBEndPointsRR { - return RuleEpCountErr, errors.New("endpoints-range1 error") - } - // Validate persist timeout if serv.Sel == cmn.LbSelRrPersist { if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 { @@ -2930,19 +2922,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { if at.sel == cmn.LbSelPrio { j := 0 k := 0 - var small [MaxLBEndPointsRR]int - var neps [MaxLBEndPointsRR]ruleLBEp + var small [MaxLBEndPoints]int + var neps [MaxLBEndPoints]ruleLBEp for i, ep := range at.endPoints { if ep.inActiveEP { continue } oEp := &at.endPoints[i] - sw := (int(ep.weight) * MaxLBEndPointsRR) / 100 + sw := (int(ep.weight) * MaxLBEndPoints) / 100 if sw == 0 { small[k] = i k++ } - for x := 0; x < sw && j < MaxLBEndPointsRR; x++ { + for x := 0; x < sw && j < MaxLBEndPoints; x++ { neps[j].xIP = oEp.xIP neps[j].rIP = oEp.rIP neps[j].xPort = oEp.xPort @@ -2955,12 +2947,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int { j++ } } - if j < MaxLBEndPointsRR { + if j < MaxLBEndPoints { v := 0 if k == 0 { k = len(at.endPoints) } - for j < MaxLBEndPointsRR { + for j < MaxLBEndPoints { idx := small[v%k] oEp := &at.endPoints[idx] neps[j].xIP = oEp.xIP diff --git a/pkg/loxinet/xsync_server.go b/pkg/loxinet/xsync_server.go index d4e9ddee7..98d070e65 100644 --- a/pkg/loxinet/xsync_server.go +++ b/pkg/loxinet/xsync_server.go @@ -26,8 +26,6 @@ import ( "net/rpc" "os" "runtime/debug" - "time" - opts "github.com/loxilb-io/loxilb/options" tk "github.com/loxilb-io/loxilib" "google.golang.org/grpc" @@ -214,10 +212,6 @@ func LoxiXsyncMain(mode string) { return } - for { - time.Sleep(1 * time.Second) - } - // Stack trace logger defer func() { if e := recover(); e != nil {