Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Support gnmi set, add some testcases and fix some bugs #14

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d75ef11
Fix testdata file name invalid in windows OS
Mar 13, 2018
b151df5
Fix dialout_client subscription paths update error
Mar 15, 2018
ed69aeb
Support wildcard field for virtual db path
Mar 19, 2018
2656f4e
Support Set RPC in gNMI to modify data in configDB
Mar 20, 2018
e65399e
Fix single port counter always get all fields value
Mar 20, 2018
e034478
Add unittest for virtual_db
Mar 21, 2018
27ccce5
Fix stream get multiple fields error
Mar 22, 2018
3f079bd
Add test case for gNMI Set
Mar 22, 2018
b298ca0
Add device name for dialout publish message
Mar 26, 2018
df8e3dd
Add option to disable TLS
Mar 29, 2018
b99bc6b
Add support for RPC Set json value
Mar 29, 2018
dcb76f8
Separate path module from db_client and add testcase
Apr 4, 2018
c079dbd
Add glide dependency file to finish go dependency management
Apr 8, 2018
ae5503b
Add db target check when get db table path
Apr 10, 2018
d2c5ab7
Separate system module from non_db_client
May 16, 2018
45be005
Add support for system version
May 16, 2018
fad9214
Add support for system ntp and shutdown state
May 22, 2018
44b24a5
Change system memory and down message format
May 24, 2018
84241e3
[telemetry]add grpc requirement
shupengzhang May 28, 2018
07e6ad3
[telemetry]add grpc show interface rxtx rate
shupengzhang May 28, 2018
3dc8f0b
[telemetry]add grpc set configdb
shupengzhang May 30, 2018
246a62d
Temporarily remove return error when table path check fail
May 31, 2018
a59e0c3
[telemetry]change bpg summary key name
shupengzhang Jun 1, 2018
2cc321e
[telemetry]fix stream data without key
shupengzhang Jun 4, 2018
6c8016e
[telemetry]fix peridic mode send package time and set configdb with json
shupengzhang Jun 5, 2018
de19489
Add allowNotFound for stream mode and fix test fail
Jun 5, 2018
49420b2
[telemetry]fix stream data without key
shupengzhang Jun 6, 2018
d319d87
[telemetry]add grpc test case
shupengzhang Jun 6, 2018
955fc1b
[telemetry]fix dialout mode when server disconnect client closed
shupengzhang Jun 13, 2018
c9d5445
[telemetry]add replace db with json file
shupengzhang Jun 13, 2018
6519561
[telemetry]fix delete not work
shupengzhang Jun 25, 2018
f199331
[telemetry]fix set configdb with unused key error
shupengzhang Aug 7, 2018
22df8bf
[telemetry]align code
shupengzhang Aug 8, 2018
39a5403
Update glide file for jipanyang/gnmi package
Aug 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 64 additions & 44 deletions dialout/dialout_client/dialout_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"sync"
"time"
"os"
)

const (
Expand Down Expand Up @@ -94,7 +95,6 @@ func (d Destination) Validate() error {

// Global config for all clients
type ClientConfig struct {
SrcIp string
RetryInterval time.Duration
Encoding gpb.Encoding
Unidirectional bool // by default, no reponse from remote server
Expand Down Expand Up @@ -127,6 +127,7 @@ type clientSubscription struct {
sendMsg uint64
recvMsg uint64
errors uint64
restart bool //whether need restart connect to server flag
}

// Client handles execution of the telemetry publish service.
Expand Down Expand Up @@ -263,9 +264,14 @@ func newClient(ctx context.Context, dest Destination) (*Client, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
}

if clientCfg.TLS != nil {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(clientCfg.TLS)))
} else {
opts = append(opts, grpc.WithInsecure())
log.V(2).Infof("gRPC without TLS")
}

conn, err := grpc.DialContext(ctx, dest.Addrs, opts...)
if err != nil {
return nil, fmt.Errorf("Dial to (%s, timeout %v): %v", dest, timeout, err)
Expand Down Expand Up @@ -297,6 +303,7 @@ restart: //Remote server might go down, in that case we restart with next destin
cs.q = queue.NewPriorityQueue(1, false)
cs.opened = true
cs.client = nil
cs.restart = false
cs.cMu.Unlock()

cs.conTryCnt++
Expand Down Expand Up @@ -337,48 +344,54 @@ restart: //Remote server might go down, in that case we restart with next destin

switch cs.reportType {
case Periodic:
ticker := time.NewTicker(cs.interval)
for {
select {
default:
spbValues, err := cs.dc.Get(nil)
if err != nil {
// TODO: need to inform
log.V(2).Infof("Data read error %v for %v", err, cs)
continue
//return nil, status.Error(codes.NotFound, err.Error())
}
var updates []*gpb.Update
var spbValue *spb.Value
for _, spbValue = range spbValues {
update := &gpb.Update{
Path: spbValue.GetPath(),
Val: spbValue.GetVal(),
case <-ticker.C:
go func() {
spbValues, err := cs.dc.Get(nil)
if err != nil {
// TODO: need to inform
log.V(2).Infof("Data read error %v for %v", err, cs)
//continue
//return nil, status.Error(codes.NotFound, err.Error())
return
}
updates = append(updates, update)
}
rs := &gpb.SubscribeResponse_Update{
Update: &gpb.Notification{
Timestamp: spbValue.GetTimestamp(),
Prefix: cs.prefix,
Update: updates,
},
}
response := &gpb.SubscribeResponse{Response: rs}

log.V(6).Infof("cs %s sending \n\t%v \n To %s", cs.name, response, dest)
err = pub.Send(response)
if err != nil {
log.V(1).Infof("Client %v pub Send error:%v, cs.conTryCnt %v", cs.name, err, cs.conTryCnt)
cs.Close()
// Retry
goto restart
}
log.V(6).Infof("cs %s to %s done", cs.name, dest)
cs.sendMsg++
c.sendMsg++
var updates []*gpb.Update
var spbValue *spb.Value
for _, spbValue = range spbValues {
update := &gpb.Update{
Path: spbValue.GetPath(),
Val: spbValue.GetVal(),
}
updates = append(updates, update)
}
rs := &gpb.SubscribeResponse_Update{
Update: &gpb.Notification{
Timestamp: spbValue.GetTimestamp(),
Prefix: cs.prefix,
Update: updates,
},
}
response := &gpb.SubscribeResponse{Response: rs}

time.Sleep(cs.interval)
log.V(6).Infof("cs %s sending \n\t%v \n To %s", cs.name, response, dest)
err = pub.Send(response)
if err != nil {
log.V(1).Infof("Client %v pub Send error:%v, cs.conTryCnt %v", cs.name, err, cs.conTryCnt)
cs.restart = true
cs.Close()
// Retry
//goto restart
}
log.V(6).Infof("cs %s to %s done", cs.name, dest)
cs.sendMsg++
c.sendMsg++
}()
case <-cs.stop:
if cs.restart == true {
goto restart
}
log.V(1).Infof("%v exiting publishRun routine for destination %s", cs, dest)
return
}
Expand Down Expand Up @@ -470,6 +483,7 @@ func processTelemetryClientConfig(ctx context.Context, redisDb *redis.Client, ke
}

log.V(2).Infof("Processing %v %v", tableKey, fv)

configMu.Lock()
defer configMu.Unlock()

Expand All @@ -482,8 +496,6 @@ func processTelemetryClientConfig(ctx context.Context, redisDb *redis.Client, ke
} else {
for field, value := range fv {
switch field {
case "src_ip":
clientCfg.SrcIp = value
case "retry_interval":
//TODO: check validity of the interval
itvl, err := strconv.ParseUint(value, 10, 64)
Expand Down Expand Up @@ -595,17 +607,25 @@ func processTelemetryClientConfig(ctx context.Context, redisDb *redis.Client, ke
cs.prefix = &gpb.Path{
Target: value,
}
deviceName, err := os.Hostname()
if err != nil {
log.V(2).Infof("Get hostname error: %v", err)
} else {
cs.prefix.Origin = deviceName
}
case "paths":
paths := strings.Split(value, ",")
for _, path := range paths {
pp, err := ygot.StringToPath(path, ygot.StructuredPath)
ps := strings.Split(value, ",")
newPaths := []*gpb.Path{}
for _, p := range ps {
pp, err := ygot.StringToPath(p, ygot.StructuredPath)
if err != nil {
log.V(2).Infof("Invalid paths %v", value)
return fmt.Errorf("Invalid paths %v", value)
}
// append *gpb.Path
cs.paths = append(cs.paths, pp)
newPaths = append(newPaths, pp)
}
cs.paths = newPaths
default:
log.V(2).Infof("Invalid field %v value %v", field, value)
return fmt.Errorf("Invalid field %v value %v", field, value)
Expand Down
13 changes: 6 additions & 7 deletions dialout/dialout_client/dialout_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func prepareDb(t *testing.T) {
mpi_name_map := loadConfig(t, "COUNTERS_PORT_NAME_MAP", countersPortNameMapByte)
loadDB(t, rclient, mpi_name_map)

fileName = "../../testdata/COUNTERS:Ethernet68.txt"
fileName = "../../testdata/COUNTERS-Ethernet68.txt"
countersEthernet68Byte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
Expand All @@ -153,7 +153,7 @@ func prepareDb(t *testing.T) {
mpi_counter := loadConfig(t, "COUNTERS:oid:0x1000000000039", countersEthernet68Byte)
loadDB(t, rclient, mpi_counter)

fileName = "../../testdata/COUNTERS:Ethernet1.txt"
fileName = "../../testdata/COUNTERS-Ethernet1.txt"
countersEthernet1Byte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
Expand All @@ -163,7 +163,7 @@ func prepareDb(t *testing.T) {
loadDB(t, rclient, mpi_counter)

// "Ethernet64:0": "oid:0x1500000000092a" : queue counter, to work as data noise
fileName = "../../testdata/COUNTERS:oid:0x1500000000092a.txt"
fileName = "../../testdata/COUNTERS-oid-0x1500000000092a.txt"
counters92aByte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
Expand Down Expand Up @@ -265,22 +265,22 @@ func TestGNMIDialOutPublish(t *testing.T) {
}
_ = countersPortNameMapByte

fileName = "../../testdata/COUNTERS:Ethernet68.txt"
fileName = "../../testdata/COUNTERS-Ethernet68.txt"
countersEthernet68Byte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
}
_ = countersEthernet68Byte

fileName = "../../testdata/COUNTERS:Ethernet_wildcard.txt"
fileName = "../../testdata/COUNTERS-Ethernet_wildcard.txt"
countersEthernetWildcardByte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
}

_ = countersEthernetWildcardByte

fileName = "../../testdata/COUNTERS:Ethernet_wildcard_PFC_7_RX.txt"
fileName = "../../testdata/COUNTERS-Ethernet_wildcard_PFC_7_RX.txt"
countersEthernetWildcardPfcByte, err := ioutil.ReadFile(fileName)
if err != nil {
t.Fatalf("read file %v err: %v", fileName, err)
Expand All @@ -289,7 +289,6 @@ func TestGNMIDialOutPublish(t *testing.T) {
_ = countersEthernetWildcardPfcByte

clientCfg := ClientConfig{
SrcIp: "",
RetryInterval: 5 * time.Second,
Encoding: pb.Encoding_JSON_IETF,
Unidirectional: true,
Expand Down
15 changes: 11 additions & 4 deletions dialout/dialout_client_cli/dialout_client_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ import (

var (
clientCfg = dc.ClientConfig{
SrcIp: "",
RetryInterval: 30 * time.Second,
Encoding: gpb.Encoding_JSON_IETF,
Unidirectional: true,
TLS: &tls.Config{},
}

tlsCfg = tls.Config{}

tlsDisable bool
)

func init() {
flag.StringVar(&clientCfg.TLS.ServerName, "server_name", "", "When set, use this hostname to verify server certificate during TLS handshake.")
flag.BoolVar(&clientCfg.TLS.InsecureSkipVerify, "insecure", false, "When set, client will not verify the server certificate during TLS handshake.")
flag.StringVar(&tlsCfg.ServerName, "server_name", "", "When set, use this hostname to verify server certificate during TLS handshake.")
flag.BoolVar(&tlsCfg.InsecureSkipVerify, "skip_verify", false, "When set, client will not verify the server certificate during TLS handshake.")
flag.BoolVar(&tlsDisable, "insecure", false, "Without TLS, only for testing")
flag.DurationVar(&clientCfg.RetryInterval, "retry_interval", 30*time.Second, "Interval at which client tries to reconnect to destination servers")
flag.BoolVar(&clientCfg.Unidirectional, "unidirectional", true, "No repesponse from server is expected")
}
Expand All @@ -41,6 +44,10 @@ func main() {
cancel()
}()
log.V(1).Infof("Starting telemetry publish client")
if !tlsDisable {
clientCfg.TLS = &tlsCfg
log.V(1).Infof("TLS enable")
}
err := dc.DialOutRun(ctx, &clientCfg)
log.V(1).Infof("Exiting telemetry publish client: %v", err)
log.Flush()
Expand Down
53 changes: 24 additions & 29 deletions dialout/dialout_server_cli/dialout_server_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"google.golang.org/grpc/credentials"

ds "github.com/Azure/sonic-telemetry/dialout/dialout_server"
testcert "github.com/Azure/sonic-telemetry/testdata/tls"
)

var (
Expand All @@ -20,7 +19,7 @@ var (
caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.")
serverCert = flag.String("server_crt", "", "TLS server certificate")
serverKey = flag.String("server_key", "", "TLS server private key")
insecure = flag.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!")
insecure = flag.Bool("insecure", false, "Without TLS, for testing only!")
allowNoClientCert = flag.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate.")
)

Expand All @@ -35,12 +34,8 @@ func main() {
var certificate tls.Certificate
var err error

if *insecure {
certificate, err = testcert.NewCert()
if err != nil {
log.Exitf("could not load server key pair: %s", err)
}
} else {
var opts []grpc.ServerOption
if !*insecure {
switch {
case *serverCert == "":
log.Errorf("serverCert must be set.")
Expand All @@ -53,32 +48,32 @@ func main() {
if err != nil {
log.Exitf("could not load server key pair: %s", err)
}
}

tlsCfg := &tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
}
if *allowNoClientCert {
// RequestClientCert will ask client for a certificate but won't
// require it to proceed. If certificate is provided, it will be
// verified.
tlsCfg.ClientAuth = tls.RequestClientCert
}

if *caCert != "" {
ca, err := ioutil.ReadFile(*caCert)
if err != nil {
log.Exitf("could not read CA certificate: %s", err)
tlsCfg := &tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
}
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Exit("failed to append CA certificate")
if *allowNoClientCert {
// RequestClientCert will ask client for a certificate but won't
// require it to proceed. If certificate is provided, it will be
// verified.
tlsCfg.ClientAuth = tls.RequestClientCert
}

if *caCert != "" {
ca, err := ioutil.ReadFile(*caCert)
if err != nil {
log.Exitf("could not read CA certificate: %s", err)
}
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Exit("failed to append CA certificate")
}
tlsCfg.ClientCAs = certPool
}
tlsCfg.ClientCAs = certPool
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
cfg := &ds.Config{}
cfg.Port = int64(*port)
s, err := ds.NewServer(cfg, opts)
Expand Down
Loading