From bbe282d79ef3f99aed346fe2c65780ee79622950 Mon Sep 17 00:00:00 2001 From: Tom Strickx Date: Wed, 14 Aug 2024 17:07:04 +0100 Subject: [PATCH] Add gNMI Extension field parsing support gNMI allows for the use of an `extension` field in each top-level message of the gNMI RPCs: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-extensions.md Given this is an arbitrary Protobyte payload, the default gNMI protobufs can't decode the payload contained within the field. This PR adds the necessary configuration options to load in an arbitrary Protobuf file per `extension` identifier, with a message-name to lookup the message type. In this change: 1. A new function `DecodeExtension` is added. This function uses `protoreflect` to dynamically marshal arbitrary protoBytes into JSON. The loaded JSON is then put back in the Extension message as bytes (mainting type) 2. The `Target` type has an `ExtensionProtoMap` added, allowing for the lookup of Extension IDs to loaded-in protobufs 3. The required changes to the `TargetConfig` type to support loading in the new configuration 4. Modified `collector.go` to output the gNMI message _after_ inlining the decoded protoBytes 5. Loading in the protobufs was added to `app/target.go`: `parseExtensionProtos`. This uses the `Parser` provided by `protoreflect/desc/protoparse` 6. Added functionality to `event.go` to insert the K/Vs provided by the Extension as Tags. Given we come from JSON, all numbers are float64, so the only 2 types supported currently are `string` and `float64` 7. Minor helper function to turn the arbitrary JSON into an arbitrary map. This has been tested with a device emiting an `extension` field: ``` [gnmic] target "edge01_test01": gNMI Subscribe Response: &{ SubscriptionName:port_stats SubscriptionConfig:{"name":"port_stats","paths":["/interfaces/interface/"],"mode":"STREAM","stream-mode":"SAMPLE","encoding":"JSON","sample-interval":15000000000,"heartbeat-interval":15000000000,"outputs":["prom-scrape-output"]} Response: update:{timestamp:1723653363502452302 prefix:{elem:{name:"interfaces"} elem:{name:"interface" key:{key:"name" value:"et-1/0/3"}}} update:{path:{elem:{name:"state"} elem:{name:"hardware-port"}} val:{json_val:"\"FPC1:PIC0:PORT3\""}} update:{path:{elem:{name:"state"} elem:{name:"transceiver"}} val:{json_val:"\"FPC1:PIC0:PORT3:Xcvr0\""}}} extension:{registered_ext:{id:1 msg:"{\"systemId\":\"edge01_test01\",\"componentId\":65535,\"sensorName\":\"sensor_1005_2_1\",\"subscribedPath\":\"/interfaces/interface/\",\"streamedPath\":\"/interfaces/interface/\",\"component\":\"chassisd\",\"sequenceNumber\":\"770002\",\"payloadGetTimestamp\":\"1723653363502\",\"streamCreationTimestamp\":\"1723653361858\",\"exportTimestamp\":\"1723653363504\",\"streamId\":\"PERIODIC\"}"}}} ``` Which is then properly rendered to a Prometheus metric: ``` gnmi_interfaces_interface_state_hardware_port{component="chassisd",componentId="65535",hardware_port="FPC1:PIC0:PORT3",interface_name="et-1/0/3",metric_source="edge01_test01",subscription_name="port_stats",systemId="edge01_test01"} 1 ``` Note that some label-drop rules have been added to remove the spurious labels to avoid a cardinality explosion. --- pkg/api/target/subscribe.go | 19 +++++++++++++++++++ pkg/api/target/target.go | 7 ++++--- pkg/api/types/target.go | 8 ++++++++ pkg/app/collector.go | 12 ++++++++++-- pkg/app/target.go | 35 +++++++++++++++++++++++++++++++++++ pkg/formatters/event.go | 36 ++++++++++++++++++++++++++++++++++++ 6 files changed, 112 insertions(+), 5 deletions(-) diff --git a/pkg/api/target/subscribe.go b/pkg/api/target/subscribe.go index 0de10d3a..0bb37b7d 100644 --- a/pkg/api/target/subscribe.go +++ b/pkg/api/target/subscribe.go @@ -303,6 +303,25 @@ func (t *Target) DecodeProtoBytes(resp *gnmi.SubscribeResponse) error { return nil } +func (t *Target) DecodeExtension(resp *gnmi.SubscribeResponse) error { + if t.ExtensionProtoMap == nil { + return nil + } + for _, extension := range resp.Extension { + m := dynamic.NewMessage(t.ExtensionProtoMap[int(extension.GetRegisteredExt().GetId().Number())]) + err := m.Unmarshal(extension.GetRegisteredExt().GetMsg()) + if err != nil { + return err + } + jsondata, err := m.MarshalJSON() + if err != nil { + return err + } + extension.GetRegisteredExt().Msg = jsondata + } + return nil +} + func (t *Target) DeleteSubscription(name string) { t.m.Lock() defer t.m.Unlock() diff --git a/pkg/api/target/target.go b/pkg/api/target/target.go index 146a2e42..967c2cb5 100644 --- a/pkg/api/target/target.go +++ b/pkg/api/target/target.go @@ -53,9 +53,10 @@ type Target struct { subscribeResponses chan *SubscribeResponse errors chan *TargetError stopped bool - StopChan chan struct{} `json:"-"` - Cfn context.CancelFunc `json:"-"` - RootDesc desc.Descriptor `json:"-"` + StopChan chan struct{} `json:"-"` + Cfn context.CancelFunc `json:"-"` + RootDesc desc.Descriptor `json:"-"` + ExtensionProtoMap map[int]*desc.MessageDescriptor `json:"-"` } // NewTarget // diff --git a/pkg/api/types/target.go b/pkg/api/types/target.go index 042fab81..92b38ec9 100644 --- a/pkg/api/types/target.go +++ b/pkg/api/types/target.go @@ -154,6 +154,14 @@ type TargetConfig struct { GRPCKeepalive *clientKeepalive `mapstructure:"grpc-keepalive,omitempty" yaml:"grpc-keepalive,omitempty" json:"grpc-keepalive,omitempty"` tlsConfig *tls.Config + + RegisteredExtensions []*RegisteredExtension `mapstructure:"registered-extensions,omitempty" yaml:"registered-extensions,omitempty" json:"registered-extensions,omitempty"` +} + +type RegisteredExtension struct { + Id int `mapstructure:"id,omitempty" yaml:"id,omitempty" json:"id,omitempty"` + MessageName string `mapstructure:"message-name,omitempty" yaml:"message-name,omitempty" json:"message-name,omitempty"` + ProtoFile string `mapstructure:"proto-file,omitempty" yaml:"proto-file,omitempty" json:"proto-file,omitempty"` } type clientKeepalive struct { diff --git a/pkg/app/collector.go b/pkg/app/collector.go index 1333d5a9..e25197d7 100644 --- a/pkg/app/collector.go +++ b/pkg/app/collector.go @@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) { select { case rsp := <-rspChan: subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1) - if a.Config.Debug { - a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) + // decode gNMI extensions + if extensions := rsp.Response.Extension; len(extensions) > 0 { + err := t.DecodeExtension(rsp.Response) + if err != nil { + a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err) + continue + } } err := t.DecodeProtoBytes(rsp.Response) if err != nil { a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err) continue } + if a.Config.Debug { + a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) + } m := outputs.Meta{ "source": t.Config.Name, "format": a.Config.Format, diff --git a/pkg/app/target.go b/pkg/app/target.go index 69b02a55..f08598d2 100644 --- a/pkg/app/target.go +++ b/pkg/app/target.go @@ -14,6 +14,9 @@ import ( "github.com/fullstorydev/grpcurl" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" + "github.com/openconfig/gnmic/pkg/api/target" "github.com/openconfig/gnmic/pkg/api/types" ) @@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) { if err != nil { return nil, err } + err = a.parseExtensionProtos(t) + if err != nil { + return nil, err + } a.Targets[t.Config.Name] = t return t, nil } @@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error { return nil } +// Dynamically parse (and load) protobuf files defined in config for specific extension IDs +func (a *App) parseExtensionProtos(t *target.Target) error { + parser := protoparse.Parser{} + extensionProtoMap := make(map[int]*desc.MessageDescriptor) + a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name) + if len(t.Config.RegisteredExtensions) == 0 { + return nil + } + for _, extension := range t.Config.RegisteredExtensions { + descSources, err := parser.ParseFiles(extension.ProtoFile) + if err != nil { + a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err) + return err + } + // Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice + // Verify if the provided message exists in the proto and assign + if desc := descSources[0].FindMessage(extension.MessageName); desc != nil { + extensionProtoMap[extension.Id] = desc + } else { + a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName) + return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName) + } + } + t.ExtensionProtoMap = extensionProtoMap + a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name) + return nil +} + func (a *App) targetConfigExists(name string) bool { a.configLock.RLock() _, ok := a.Config.Targets[name] diff --git a/pkg/formatters/event.go b/pkg/formatters/event.go index 7f600cbf..b853d8bd 100644 --- a/pkg/formatters/event.go +++ b/pkg/formatters/event.go @@ -12,10 +12,12 @@ import ( "encoding/json" "fmt" "math" + "strconv" "strings" flattener "github.com/karimra/go-map-flattener" "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/gnmi/proto/gnmi_ext" ) // EventMsg represents a gNMI update message, @@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri return nil, nil } evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete())) + response := rsp switch rsp := rsp.Response.(type) { case *gnmi.SubscribeResponse_Update: namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix()) + // Extension message to tags + if prefixTags == nil { + prefixTags = make(map[string]string) + } + for _, ext := range response.Extension { + extensionValues, err := extensionToMap(ext) + if err != nil { + return nil, err + } + for k, v := range extensionValues { + switch v.(type) { + case string: + prefixTags[k] = string(v.(string)) + case float64: + prefixTags[k] = strconv.FormatFloat(v.(float64), 'G', -1, 64) + } + } + } // notification updates uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta) if err != nil { @@ -200,6 +221,21 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) { return sb.String(), tags } +func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) { + var jsondata []byte + jsondata = ext.GetRegisteredExt().GetMsg() + + var anyJson map[string]interface{} + if len(jsondata) != 0 { + err := json.Unmarshal(jsondata, &anyJson) + if err != nil { + return nil, err + } + return anyJson, nil + } + return nil, fmt.Errorf("0 length JSON decoded") +} + func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) { if updValue == nil { return nil, nil