Skip to content

Commit

Permalink
fix minor issues and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
mehrdadrad committed Apr 1, 2021
1 parent a8cf298 commit 79067e8
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 31 deletions.
7 changes: 3 additions & 4 deletions vflow/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func NewIPFIX() *IPFIX {
port: opts.IPFIXPort,
addr: opts.IPFIXAddr,
workers: opts.IPFIXWorkers,
pool: make(chan chan struct{}, maxWorkers),
}
}

Expand All @@ -96,6 +95,8 @@ func (i *IPFIX) run() {
return
}

i.pool = make(chan chan struct{}, maxWorkers)

hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port))
udpAddr, _ := net.ResolveUDPAddr("udp", hostPort)

Expand Down Expand Up @@ -130,11 +131,10 @@ func (i *IPFIX) run() {

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p := producer.NewProducer(opts.MQName)
p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
p.MQErrorCount = &i.stats.MQErrorCount
p.Logger = logger
Expand Down Expand Up @@ -171,7 +171,6 @@ func (i *IPFIX) run() {
func (i *IPFIX) shutdown() {
// exit if the ipfix is disabled
if !opts.IPFIXEnabled {
logger.Println("ipfix disabled")
return
}

Expand Down
12 changes: 6 additions & 6 deletions vflow/netflow_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sync/atomic"
"time"

"github.com/VerizonDigital/vflow/netflow/v5"
netflow5 "github.com/VerizonDigital/vflow/netflow/v5"
"github.com/VerizonDigital/vflow/producer"
)

Expand Down Expand Up @@ -77,18 +77,20 @@ var (
func NewNetflowV5() *NetflowV5 {
return &NetflowV5{
port: opts.NetflowV5Port,
addr: opts.NetflowV5Addr,
workers: opts.NetflowV5Workers,
pool: make(chan chan struct{}, maxWorkers),
}
}

func (i *NetflowV5) run() {
// exit if the netflow v5 is disabled
if !opts.NetflowV5Enabled {
logger.Println("netflowv5 has been disabled")
logger.Println("netflow v5 has been disabled")
return
}

i.pool = make(chan chan struct{}, maxWorkers)

hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port))
udpAddr, _ := net.ResolveUDPAddr("udp", hostPort)

Expand All @@ -110,11 +112,10 @@ func (i *NetflowV5) run() {

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p := producer.NewProducer(opts.MQName)
p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
p.MQErrorCount = &i.stats.MQErrorCount
p.Logger = logger
Expand Down Expand Up @@ -151,7 +152,6 @@ func (i *NetflowV5) run() {
func (i *NetflowV5) shutdown() {
// exit if the netflow v5 is disabled
if !opts.NetflowV5Enabled {
logger.Println("netflow v5 disabled")
return
}

Expand Down
13 changes: 6 additions & 7 deletions vflow/netflow_v9.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sync/atomic"
"time"

"github.com/VerizonDigital/vflow/netflow/v9"
netflow9 "github.com/VerizonDigital/vflow/netflow/v9"
"github.com/VerizonDigital/vflow/producer"
)

Expand Down Expand Up @@ -79,19 +79,20 @@ var (
func NewNetflowV9() *NetflowV9 {
return &NetflowV9{
port: opts.NetflowV9Port,
addr: opts.NetflowV9Addr,
workers: opts.NetflowV9Workers,
pool: make(chan chan struct{}, maxWorkers),
}
}

func (i *NetflowV9) run() {
//TODO
// exit if the netflow v9 is disabled
if !opts.NetflowV9Enabled {
logger.Println("netflowv9 has been disabled")
logger.Println("netflow v9 has been disabled")
return
}

i.pool = make(chan chan struct{}, maxWorkers)

hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port))
udpAddr, _ := net.ResolveUDPAddr("udp", hostPort)

Expand All @@ -115,11 +116,10 @@ func (i *NetflowV9) run() {

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p := producer.NewProducer(opts.MQName)
p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
p.MQErrorCount = &i.stats.MQErrorCount
p.Logger = logger
Expand Down Expand Up @@ -156,7 +156,6 @@ func (i *NetflowV9) run() {
func (i *NetflowV9) shutdown() {
// exit if the netflow v9 is disabled
if !opts.NetflowV9Enabled {
logger.Println("netflow v9 disabled")
return
}

Expand Down
13 changes: 8 additions & 5 deletions vflow/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Options struct {
// sFlow options
SFlowEnabled bool `yaml:"sflow-enabled"`
SFlowPort int `yaml:"sflow-port"`
SFlowAddr string `yaml:"sflow-addr"`
SFlowUDPSize int `yaml:"sflow-udp-size"`
SFlowWorkers int `yaml:"sflow-workers"`
SFlowTopic string `yaml:"sflow-topic"`
Expand All @@ -89,20 +90,22 @@ type Options struct {
// Netflow V5
NetflowV5Enabled bool `yaml:"netflow5-enabled"`
NetflowV5Port int `yaml:"netflow5-port"`
NetflowV5Addr string `yaml:"netflow5-addr"`
NetflowV5UDPSize int `yaml:"netflow5-udp-size"`
NetflowV5Workers int `yaml:"netflow5-workers"`
NetflowV5Topic string `yaml:"netflow5-topic"`

// Netflow
NetflowV9Enabled bool `yaml:"netflow9-enabled"`
NetflowV9Port int `yaml:"netflow9-port"`
NetflowV9Addr string `yaml:"netflow9-addr"`
NetflowV9UDPSize int `yaml:"netflow9-udp-size"`
NetflowV9Workers int `yaml:"netflow9-workers"`
NetflowV9Topic string `yaml:"netflow9-topic"`
NetflowV9TplCacheFile string `yaml:"netflow9-tpl-cache-file"`

// producer
ProducerEnabled bool `yaml:producer-enabled"`
ProducerEnabled bool `yaml:"producer-enabled"`
MQName string `yaml:"mq-name"`
MQConfigFile string `yaml:"mq-config-file"`

Expand Down Expand Up @@ -243,11 +246,8 @@ func (opts Options) vFlowIsRunning() bool {

cmd := exec.Command("kill", "-0", string(b))
_, err = cmd.Output()
if err != nil {
return false
}

return true
return err == nil
}

func (opts Options) printVersion() {
Expand Down Expand Up @@ -324,6 +324,7 @@ func (opts *Options) flagSet() {
// sflow options
flag.BoolVar(&opts.SFlowEnabled, "sflow-enabled", opts.SFlowEnabled, "enable/disable sflow listener")
flag.IntVar(&opts.SFlowPort, "sflow-port", opts.SFlowPort, "sflow port number")
flag.StringVar(&opts.SFlowAddr, "sflow-addr", opts.SFlowAddr, "sflow IP address to bind to")
flag.IntVar(&opts.SFlowUDPSize, "sflow-max-udp-size", opts.SFlowUDPSize, "sflow maximum UDP size")
flag.IntVar(&opts.SFlowWorkers, "sflow-workers", opts.SFlowWorkers, "sflow workers number")
flag.StringVar(&opts.SFlowTopic, "sflow-topic", opts.SFlowTopic, "sflow topic name")
Expand All @@ -348,13 +349,15 @@ func (opts *Options) flagSet() {
// netflow version 5
flag.BoolVar(&opts.NetflowV5Enabled, "netflow5-enabled", opts.NetflowV5Enabled, "enable/disable netflow version 5 listener")
flag.IntVar(&opts.NetflowV5Port, "netflow5-port", opts.NetflowV5Port, "Netflow Version 5 port number")
flag.StringVar(&opts.NetflowV5Addr, "netflow5-addr", opts.NetflowV5Addr, "Netflow 5 IP address to bind to")
flag.IntVar(&opts.NetflowV5UDPSize, "netflow5-max-udp-size", opts.NetflowV5UDPSize, "Netflow version 5 maximum UDP size")
flag.IntVar(&opts.NetflowV5Workers, "netflow5-workers", opts.NetflowV5Workers, "Netflow version 5 workers number")
flag.StringVar(&opts.NetflowV5Topic, "netflow5-topic", opts.NetflowV5Topic, "Netflow version 5 topic name")

// netflow version 9
flag.BoolVar(&opts.NetflowV9Enabled, "netflow9-enabled", opts.NetflowV9Enabled, "enable/disable netflow version 9 listener")
flag.IntVar(&opts.NetflowV9Port, "netflow9-port", opts.NetflowV9Port, "Netflow Version 9 port number")
flag.StringVar(&opts.NetflowV9Addr, "netflow9-addr", opts.NetflowV9Addr, "Netflow 9 IP address to bind to")
flag.IntVar(&opts.NetflowV9UDPSize, "netflow9-max-udp-size", opts.NetflowV9UDPSize, "Netflow version 9 maximum UDP size")
flag.IntVar(&opts.NetflowV9Workers, "netflow9-workers", opts.NetflowV9Workers, "Netflow version 9 workers number")
flag.StringVar(&opts.NetflowV9Topic, "netflow9-topic", opts.NetflowV9Topic, "Netflow version 9 topic name")
Expand Down
15 changes: 11 additions & 4 deletions vflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ var (
func NewSFlow() *SFlow {
return &SFlow{
port: opts.SFlowPort,
addr: opts.SFlowAddr,
workers: opts.SFlowWorkers,
pool: make(chan chan struct{}, maxWorkers),
}
}

Expand All @@ -95,6 +95,8 @@ func (s *SFlow) run() {
return
}

s.pool = make(chan chan struct{}, maxWorkers)

hostPort := net.JoinHostPort(s.addr, strconv.Itoa(s.port))
udpAddr, _ := net.ResolveUDPAddr("udp", hostPort)

Expand All @@ -117,12 +119,11 @@ func (s *SFlow) run() {
logger.Printf("sFlow is running (UDP: listening on [::]:%d workers#: %d)", s.port, s.workers)

go func() {
p := producer.NewProducer(opts.MQName)
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}

p := producer.NewProducer(opts.MQName)
p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
p.MQErrorCount = &s.stats.MQErrorCount
p.Logger = logger
Expand Down Expand Up @@ -157,11 +158,17 @@ func (s *SFlow) run() {
}

func (s *SFlow) shutdown() {
// exit if the sFlow v5 is disabled
if !opts.SFlowEnabled {
return
}

// stop reading from UDP listener
s.stop = true
logger.Println("stopping sflow service gracefully ...")
time.Sleep(1 * time.Second)
s.conn.Close()
logger.Println("vFlow has been shutdown")
logger.Println("sFlow has been shutdown")
close(sFlowUDPCh)
}

Expand Down
9 changes: 4 additions & 5 deletions vflow/vflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ func main() {
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
logger = opts.Logger

sFlow := NewSFlow()
ipfix := NewIPFIX()
netflow5 := NewNetflowV5()
netflow9 := NewNetflowV9()
if !opts.ProducerEnabled {
logger.Println("producer message queue has been disabled")
}

protos := []proto{sFlow, ipfix, netflow5, netflow9}
protos := []proto{NewSFlow(), NewIPFIX(), NewNetflowV5(), NewNetflowV9()}

for _, p := range protos {
wg.Add(1)
Expand Down

0 comments on commit 79067e8

Please sign in to comment.