diff --git a/pkg/api/target/subscribe.go b/pkg/api/target/subscribe.go index 0de10d3a..0bb37b7d 100644 --- a/pkg/api/target/subscribe.go +++ b/pkg/api/target/subscribe.go @@ -303,6 +303,25 @@ func (t *Target) DecodeProtoBytes(resp *gnmi.SubscribeResponse) error { return nil } +func (t *Target) DecodeExtension(resp *gnmi.SubscribeResponse) error { + if t.ExtensionProtoMap == nil { + return nil + } + for _, extension := range resp.Extension { + m := dynamic.NewMessage(t.ExtensionProtoMap[int(extension.GetRegisteredExt().GetId().Number())]) + err := m.Unmarshal(extension.GetRegisteredExt().GetMsg()) + if err != nil { + return err + } + jsondata, err := m.MarshalJSON() + if err != nil { + return err + } + extension.GetRegisteredExt().Msg = jsondata + } + return nil +} + func (t *Target) DeleteSubscription(name string) { t.m.Lock() defer t.m.Unlock() diff --git a/pkg/api/target/target.go b/pkg/api/target/target.go index 146a2e42..967c2cb5 100644 --- a/pkg/api/target/target.go +++ b/pkg/api/target/target.go @@ -53,9 +53,10 @@ type Target struct { subscribeResponses chan *SubscribeResponse errors chan *TargetError stopped bool - StopChan chan struct{} `json:"-"` - Cfn context.CancelFunc `json:"-"` - RootDesc desc.Descriptor `json:"-"` + StopChan chan struct{} `json:"-"` + Cfn context.CancelFunc `json:"-"` + RootDesc desc.Descriptor `json:"-"` + ExtensionProtoMap map[int]*desc.MessageDescriptor `json:"-"` } // NewTarget // diff --git a/pkg/api/types/target.go b/pkg/api/types/target.go index 042fab81..92b38ec9 100644 --- a/pkg/api/types/target.go +++ b/pkg/api/types/target.go @@ -154,6 +154,14 @@ type TargetConfig struct { GRPCKeepalive *clientKeepalive `mapstructure:"grpc-keepalive,omitempty" yaml:"grpc-keepalive,omitempty" json:"grpc-keepalive,omitempty"` tlsConfig *tls.Config + + RegisteredExtensions []*RegisteredExtension `mapstructure:"registered-extensions,omitempty" yaml:"registered-extensions,omitempty" json:"registered-extensions,omitempty"` +} + +type RegisteredExtension struct { + Id int `mapstructure:"id,omitempty" yaml:"id,omitempty" json:"id,omitempty"` + MessageName string `mapstructure:"message-name,omitempty" yaml:"message-name,omitempty" json:"message-name,omitempty"` + ProtoFile string `mapstructure:"proto-file,omitempty" yaml:"proto-file,omitempty" json:"proto-file,omitempty"` } type clientKeepalive struct { diff --git a/pkg/app/collector.go b/pkg/app/collector.go index 1333d5a9..e25197d7 100644 --- a/pkg/app/collector.go +++ b/pkg/app/collector.go @@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) { select { case rsp := <-rspChan: subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1) - if a.Config.Debug { - a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) + // decode gNMI extensions + if extensions := rsp.Response.Extension; len(extensions) > 0 { + err := t.DecodeExtension(rsp.Response) + if err != nil { + a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err) + continue + } } err := t.DecodeProtoBytes(rsp.Response) if err != nil { a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err) continue } + if a.Config.Debug { + a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) + } m := outputs.Meta{ "source": t.Config.Name, "format": a.Config.Format, diff --git a/pkg/app/target.go b/pkg/app/target.go index 69b02a55..f08598d2 100644 --- a/pkg/app/target.go +++ b/pkg/app/target.go @@ -14,6 +14,9 @@ import ( "github.com/fullstorydev/grpcurl" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" + "github.com/openconfig/gnmic/pkg/api/target" "github.com/openconfig/gnmic/pkg/api/types" ) @@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) { if err != nil { return nil, err } + err = a.parseExtensionProtos(t) + if err != nil { + return nil, err + } a.Targets[t.Config.Name] = t return t, nil } @@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error { return nil } +// Dynamically parse (and load) protobuf files defined in config for specific extension IDs +func (a *App) parseExtensionProtos(t *target.Target) error { + parser := protoparse.Parser{} + extensionProtoMap := make(map[int]*desc.MessageDescriptor) + a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name) + if len(t.Config.RegisteredExtensions) == 0 { + return nil + } + for _, extension := range t.Config.RegisteredExtensions { + descSources, err := parser.ParseFiles(extension.ProtoFile) + if err != nil { + a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err) + return err + } + // Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice + // Verify if the provided message exists in the proto and assign + if desc := descSources[0].FindMessage(extension.MessageName); desc != nil { + extensionProtoMap[extension.Id] = desc + } else { + a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName) + return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName) + } + } + t.ExtensionProtoMap = extensionProtoMap + a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name) + return nil +} + func (a *App) targetConfigExists(name string) bool { a.configLock.RLock() _, ok := a.Config.Targets[name] diff --git a/pkg/formatters/event.go b/pkg/formatters/event.go index 7f600cbf..b853d8bd 100644 --- a/pkg/formatters/event.go +++ b/pkg/formatters/event.go @@ -12,10 +12,12 @@ import ( "encoding/json" "fmt" "math" + "strconv" "strings" flattener "github.com/karimra/go-map-flattener" "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/gnmi/proto/gnmi_ext" ) // EventMsg represents a gNMI update message, @@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri return nil, nil } evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete())) + response := rsp switch rsp := rsp.Response.(type) { case *gnmi.SubscribeResponse_Update: namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix()) + // Extension message to tags + if prefixTags == nil { + prefixTags = make(map[string]string) + } + for _, ext := range response.Extension { + extensionValues, err := extensionToMap(ext) + if err != nil { + return nil, err + } + for k, v := range extensionValues { + switch v.(type) { + case string: + prefixTags[k] = string(v.(string)) + case float64: + prefixTags[k] = strconv.FormatFloat(v.(float64), 'G', -1, 64) + } + } + } // notification updates uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta) if err != nil { @@ -200,6 +221,21 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) { return sb.String(), tags } +func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) { + var jsondata []byte + jsondata = ext.GetRegisteredExt().GetMsg() + + var anyJson map[string]interface{} + if len(jsondata) != 0 { + err := json.Unmarshal(jsondata, &anyJson) + if err != nil { + return nil, err + } + return anyJson, nil + } + return nil, fmt.Errorf("0 length JSON decoded") +} + func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) { if updValue == nil { return nil, nil