diff --git a/internal/channel/channel.go b/internal/channel/channel.go index 5fa5658f..f4f9f18f 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -3,15 +3,11 @@ package channel import ( "context" "errors" - "fmt" "github.com/icinga/icinga-notifications/internal/config/baseconf" - "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "net/url" ) type Channel struct { @@ -158,42 +154,33 @@ func (c *Channel) Restart() { c.restartCh <- newConfig{c.Type, c.Config} } -// Notify prepares and sends the notification request, returns a non-error on fails, nil on success -func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error { - p := c.getPlugin() - if p == nil { - return errors.New("plugin could not be started") +// Notify sends the provided notification request to the given *recipient.Contact. +// If the *plugin.Contact field of the specified *plugin.NotificationRequest is not set, it +// automatically determines the contact addresses and sets the notification request contact accordingly. +// +// Returns an error in all the following cases: +// - if the *plugin.Event of the provided notification request is not set, +// - the *plugin.Object of the provided notification request is not set, +// - trying to send a state change event without an associated *plugin.Incident, +// - the corresponding plugin of this channel cannot be started successfully, +// - or fails to successfully deliver the request to the corresponding recipient address(es). +func (c *Channel) Notify(req *plugin.NotificationRequest) error { + if req.Event == nil { + return errors.New("invalid notification request: Event is nil") } - - contactStruct := &plugin.Contact{FullName: contact.FullName} - for _, addr := range contact.Addresses { - contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) + if req.Object == nil { + return errors.New("invalid notification request: Object is nil") + } + if req.Contact == nil { + return errors.New("invalid notification request: Contact is nil") + } + if req.Incident == nil && req.Event.Type == event.TypeState { + return errors.New("invalid notification request: cannot send state notification without an incident") } - baseUrl, _ := url.Parse(icingaweb2Url) - incidentUrl := baseUrl.JoinPath("/notifications/incident") - incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID()) - object := i.IncidentObject() - - req := &plugin.NotificationRequest{ - Contact: contactStruct, - Object: &plugin.Object{ - Name: object.DisplayName(), - Url: ev.URL, - Tags: object.Tags, - ExtraTags: object.ExtraTags, - }, - Incident: &plugin.Incident{ - Id: i.ID(), - Url: incidentUrl.String(), - Severity: i.SeverityString(), - }, - Event: &plugin.Event{ - Time: ev.Time, - Type: ev.Type, - Username: ev.Username, - Message: ev.Message, - }, + p := c.getPlugin() + if p == nil { + return errors.New("plugin could not be started") } return p.SendNotification(req) diff --git a/internal/config/evaluable_config.go b/internal/config/evaluable_config.go new file mode 100644 index 00000000..a75378e8 --- /dev/null +++ b/internal/config/evaluable_config.go @@ -0,0 +1,137 @@ +package config + +import ( + "github.com/icinga/icinga-notifications/internal/filter" + "github.com/icinga/icinga-notifications/internal/rule" +) + +// EvalOptions specifies optional callbacks that are executed upon certain filter evaluation events. +type EvalOptions[T, U any] struct { + // OnPreEvaluate can be used to perform arbitrary actions before evaluating the current entry of type "T". + // An entry of type "T" for which this hook returns "false" will be excluded from evaluation. + OnPreEvaluate func(T) bool + + // OnError can be used to perform arbitrary actions on filter evaluation errors. + // The original filter evaluation error is passed to this function as well as the current + // entry of type "T", whose filter evaluation triggered the error. + // + // By default, the filter evaluation doesn't get interrupted if any of them fail, instead it will continue + // evaluating all the remaining entries. However, you can override this behaviour by returning "false" in + // your handler, in which case the filter evaluation is aborted prematurely. + OnError func(T, error) bool + + // OnFilterMatch can be used to perform arbitrary actions after a successful filter evaluation of type "T". + // This callback obtains the current entry of type "T" as an argument, whose filter matched on the filterableTest. + // + // Note, any error returned by the OnFilterMatch hook causes the filter evaluation to be aborted + // immediately before even reaching the remaining ones. + OnFilterMatch func(T) error + + // OnAllConfigEvaluated can be used to perform some post filter evaluation actions. + // This handler receives an arbitrary value, be it a result of any filter evaluation or a made-up one of type "U". + // + // OnAllConfigEvaluated will only be called once all the entries of type "T" are evaluated, though it doesn't + // necessarily depend on the result of the individual entry filter evaluation. If the individual Eval* receivers + // don't return prematurely with an error, this hook is guaranteed to be called in any other cases. However, you + // should be aware, that this hook may not be supported by all Eval* methods. + OnAllConfigEvaluated func(U) +} + +// Evaluable manages an evaluable config types in a centralised and structured way. +// An evaluable config is a config type that allows to evaluate filter expressions in some way. +type Evaluable struct { + Rules map[int64]bool `db:"-"` + RuleEntries map[int64]*rule.Escalation `db:"-" json:"-"` +} + +// NewEvaluable returns a fully initialised and ready to use Evaluable type. +func NewEvaluable() *Evaluable { + return &Evaluable{ + Rules: make(map[int64]bool), + RuleEntries: make(map[int64]*rule.Escalation), + } +} + +// EvaluateRules evaluates all the configured event rule.Rule(s) for the given filter.Filterable object. +// +// Please note that this function may not always evaluate *all* configured rules from the specified RuntimeConfig, +// as it internally caches all previously matched rules based on their ID. +// +// EvaluateRules allows you to specify EvalOptions and hook up certain filter evaluation steps. +// This function does not support the EvalOptions.OnAllConfigEvaluated callback and will never trigger +// it (if provided). Please refer to the description of the individual EvalOptions to find out more about +// when the hooks get triggered and possible special cases. +// +// Returns an error if any of the provided callbacks return an error, otherwise always nil. +func (e *Evaluable) EvaluateRules(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Rule, any]) error { + for _, ru := range r.Rules { + if !e.Rules[ru.ID] && (options.OnPreEvaluate == nil || options.OnPreEvaluate(ru)) { + matched, err := ru.Eval(filterable) + if err != nil && options.OnError != nil && !options.OnError(ru, err) { + return err + } + if err != nil || !matched { + continue + } + + if options.OnFilterMatch != nil { + if err := options.OnFilterMatch(ru); err != nil { + return err + } + } + + e.Rules[ru.ID] = true + } + } + + return nil +} + +// EvaluateRuleEntries evaluates all the configured rule.Entry for the provided filter.Filterable object. +// +// This function allows you to specify EvalOptions and hook up certain filter evaluation steps. +// Currently, EvaluateRuleEntries fully support all the available EvalOptions. Please refer to the +// description of the individual EvalOptions to find out more about when the hooks get triggered and +// possible special cases. +// +// Returns an error if any of the provided callbacks return an error, otherwise always nil. +func (e *Evaluable) EvaluateRuleEntries(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Escalation, any]) error { + retryAfter := rule.RetryNever + + for ruleID := range e.Rules { + ru := r.Rules[ruleID] + if ru == nil { + // It would be appropriate to have a debug log here, but unfortunately we don't have access to a logger. + continue + } + + for _, entry := range ru.Escalations { + if options.OnPreEvaluate != nil && !options.OnPreEvaluate(entry) { + continue + } + + if matched, err := entry.Eval(filterable); err != nil { + if options.OnError != nil && !options.OnError(entry, err) { + return err + } + } else if cond, ok := filterable.(*rule.EscalationFilter); !matched && ok { + incidentAgeFilter := cond.ReevaluateAfter(entry.Condition) + retryAfter = min(retryAfter, incidentAgeFilter) + } else if matched { + if options.OnFilterMatch != nil { + if err := options.OnFilterMatch(entry); err != nil { + return err + } + } + + e.RuleEntries[entry.ID] = entry + } + } + } + + if options.OnAllConfigEvaluated != nil { + options.OnAllConfigEvaluated(retryAfter) + } + + return nil +} diff --git a/internal/config/evaluable_config_test.go b/internal/config/evaluable_config_test.go new file mode 100644 index 00000000..ed8ee6e4 --- /dev/null +++ b/internal/config/evaluable_config_test.go @@ -0,0 +1,243 @@ +package config + +import ( + "fmt" + "github.com/icinga/icinga-notifications/internal/filter" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/stretchr/testify/require" + "maps" + "testing" + "time" +) + +const defaultDivisor = 3 + +func TestEvaluableConfig(t *testing.T) { + t.Parallel() + + runtimeConfigTest := new(RuntimeConfig) + runtimeConfigTest.Rules = make(map[int64]*rule.Rule) + for i := 1; i <= 50; i++ { + runtimeConfigTest.Rules[int64(i)] = makeRule(t, i) + } + + t.Run("NewEvaluable", func(t *testing.T) { + t.Parallel() + + e := NewEvaluable() + require.NotNil(t, e, "it should create a fully initialised evaluable config") + require.NotNil(t, e.Rules) + require.NotNil(t, e.RuleEntries) + }) + + t.Run("EvaluateRules", func(t *testing.T) { + t.Parallel() + + runtime := new(RuntimeConfig) + runtime.Rules = maps.Clone(runtimeConfigTest.Rules) + + expectedLen := len(runtime.Rules) / defaultDivisor + options := EvalOptions[*rule.Rule, any]{} + e := NewEvaluable() + assertRules := func(expectedLen *int, expectError bool) { + if expectError { + require.Error(t, e.EvaluateRules(runtime, new(filterableTest), options)) + } else { + require.NoError(t, e.EvaluateRules(runtime, new(filterableTest), options)) + } + require.Len(t, e.Rules, *expectedLen) + } + + assertRules(&expectedLen, false) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + options.OnPreEvaluate = func(r *rule.Rule) bool { + require.Falsef(t, e.Rules[r.ID], "EvaluateRules() shouldn't evaluate %q twice", r.Name) + return true + } + options.OnError = func(r *rule.Rule, err error) bool { + require.EqualError(t, err, `"nonexistent" is not a valid filter key`) + require.Truef(t, r.ID%defaultDivisor != 0, "evaluating rule %q should not fail", r.Name) + return true + } + options.OnFilterMatch = func(r *rule.Rule) error { + require.Falsef(t, e.Rules[r.ID], "EvaluateRules() shouldn't evaluate %q twice", r.Name) + return nil + } + + assertRules(&expectedLen, false) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + lenBeforeError := new(int) + options.OnError = func(r *rule.Rule, err error) bool { + if *lenBeforeError != 0 { + require.Fail(t, "OnError() shouldn't have been called again") + } + + require.EqualError(t, err, `"nonexistent" is not a valid filter key`) + require.Truef(t, r.ID%defaultDivisor != 0, "evaluating rule %q should not fail", r.Name) + + *lenBeforeError = len(e.Rules) + return false // This should let the evaluation fail completely! + } + assertRules(lenBeforeError, true) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + *lenBeforeError = 0 + options.OnError = nil + options.OnFilterMatch = func(r *rule.Rule) error { + if *lenBeforeError != 0 { + require.Fail(t, "OnFilterMatch() shouldn't have been called again") + } + + *lenBeforeError = len(e.Rules) + return fmt.Errorf("OnFilterMatch() failed badly") // This should let the evaluation fail completely! + } + assertRules(lenBeforeError, true) + }) + + t.Run("EvaluateRuleEntries", func(t *testing.T) { + t.Parallel() + + runtime := new(RuntimeConfig) + runtime.Rules = maps.Clone(runtimeConfigTest.Rules) + + e := NewEvaluable() + options := EvalOptions[*rule.Escalation, any]{} + + expectedLen := 0 + filterContext := &rule.EscalationFilter{IncidentSeverity: 9} // Event severity "emergency" + assertEntries := func(expectedLen *int, expectError bool) { + if expectError { + require.Error(t, e.EvaluateRuleEntries(runtime, filterContext, options)) + } else { + require.NoError(t, e.EvaluateRuleEntries(runtime, filterContext, options)) + } + require.Len(t, e.RuleEntries, *expectedLen) + e.RuleEntries = make(map[int64]*rule.Escalation) + } + + assertEntries(&expectedLen, false) + require.NoError(t, e.EvaluateRules(runtime, new(filterableTest), EvalOptions[*rule.Rule, any]{})) + require.Len(t, e.Rules, len(runtime.Rules)/defaultDivisor) + expectedLen = len(runtime.Rules)/defaultDivisor - 5 // 15/3 => (5) valid entries are going to be deleted below. + + // Drop some random rules from the runtime config to simulate a runtime config deletion! + maps.DeleteFunc(runtime.Rules, func(ruleID int64, _ *rule.Rule) bool { return ruleID > 35 && ruleID%defaultDivisor == 0 }) + + options.OnPreEvaluate = func(re *rule.Escalation) bool { + if re.RuleID > 35 && re.RuleID%defaultDivisor == 0 { // Those rules are deleted from our runtime config. + require.Failf(t, "OnPreEvaluate() shouldn't have been called", "rule %d was deleted from runtime config", re.RuleID) + } + + require.Nilf(t, e.RuleEntries[re.ID], "EvaluateRuleEntries() shouldn't evaluate entry %d twice", re.ID) + return true + } + options.OnError = func(re *rule.Escalation, err error) bool { + require.EqualError(t, err, `unknown severity "evaluable"`) + require.Truef(t, re.RuleID%defaultDivisor == 0, "evaluating rule entry %d should not fail", re.ID) + return true + } + options.OnFilterMatch = func(re *rule.Escalation) error { + require.Nilf(t, e.RuleEntries[re.ID], "OnPreEvaluate() shouldn't evaluate %d twice", re.ID) + return nil + } + assertEntries(&expectedLen, false) + + lenBeforeError := new(int) + options.OnError = func(re *rule.Escalation, err error) bool { + if *lenBeforeError != 0 { + require.Fail(t, "OnError() shouldn't have been called again") + } + + require.EqualError(t, err, `unknown severity "evaluable"`) + require.Truef(t, re.RuleID%defaultDivisor == 0, "evaluating rule entry %d should not fail", re.ID) + + *lenBeforeError = len(e.RuleEntries) + return false // This should let the evaluation fail completely! + } + assertEntries(lenBeforeError, true) + + *lenBeforeError = 0 + options.OnError = nil + options.OnFilterMatch = func(re *rule.Escalation) error { + if *lenBeforeError != 0 { + require.Fail(t, "OnFilterMatch() shouldn't have been called again") + } + + *lenBeforeError = len(e.RuleEntries) + return fmt.Errorf("OnFilterMatch() failed badly") // This should let the evaluation fail completely! + } + assertEntries(lenBeforeError, true) + + expectedLen = 0 + filterContext.IncidentSeverity = 1 // OK + filterContext.IncidentAge = 5 * time.Minute + + options.OnFilterMatch = nil + options.OnPreEvaluate = func(re *rule.Escalation) bool { return re.RuleID < 5 } + options.OnAllConfigEvaluated = func(result any) { + retryAfter := result.(time.Duration) + // The filter string of the escalation condition is incident_age>=10m and the actual incident age is 5m. + require.Equal(t, 5*time.Minute, retryAfter) + } + assertEntries(&expectedLen, false) + }) +} + +func makeRule(t *testing.T, i int) *rule.Rule { + r := new(rule.Rule) + r.ID = int64(i) + r.Name = fmt.Sprintf("rule-%d", i) + r.Escalations = make(map[int64]*rule.Escalation) + + invalidSeverity, err := filter.Parse("incident_severity=evaluable") + require.NoError(t, err, "parsing incident_severity=evaluable shouldn't fail") + + redundant := new(rule.Escalation) + redundant.ID = r.ID * 150 // It must be large enough to avoid colliding with others! + redundant.RuleID = r.ID + redundant.Condition = invalidSeverity + + nonexistent, err := filter.Parse("nonexistent=evaluable") + require.NoError(t, err, "parsing nonexistent=evaluable shouldn't fail") + + r.Escalations[redundant.ID] = redundant + r.ObjectFilter = nonexistent + if i%defaultDivisor == 0 { + objCond, err := filter.Parse("host=evaluable") + require.NoError(t, err, "parsing host=evaluable shouldn't fail") + + escalationCond, err := filter.Parse("incident_severity>warning||incident_age>=10m") + require.NoError(t, err, "parsing incident_severity>=ok shouldn't fail") + + entry := new(rule.Escalation) + entry.ID = r.ID * 2 + entry.RuleID = r.ID + entry.Condition = escalationCond + + r.ObjectFilter = objCond + r.Escalations[entry.ID] = entry + } + + return r +} + +// filterableTest is a test type that simulates a filter evaluation and eliminates +// the need of having to import e.g. the object package. +type filterableTest struct{} + +func (f *filterableTest) EvalEqual(k string, v string) (bool, error) { + if k != "host" { + return false, fmt.Errorf("%q is not a valid filter key", k) + } + + return v == "evaluable", nil +} + +func (f *filterableTest) EvalExists(_ string) bool { return true } +func (f *filterableTest) EvalLess(_ string, _ string) (bool, error) { + panic("Oh dear - you shouldn't have called me") +} +func (f *filterableTest) EvalLike(_, _ string) (bool, error) { return f.EvalLess("", "") } +func (f *filterableTest) EvalLessOrEqual(_, _ string) (bool, error) { return f.EvalLess("", "") } diff --git a/internal/contracts/contracts.go b/internal/contracts/contracts.go deleted file mode 100644 index 7214a894..00000000 --- a/internal/contracts/contracts.go +++ /dev/null @@ -1,14 +0,0 @@ -package contracts - -import ( - "fmt" - "github.com/icinga/icinga-notifications/internal/object" -) - -type Incident interface { - fmt.Stringer - - ID() int64 - IncidentObject() *object.Object - SeverityString() string -} diff --git a/internal/incident/incident.go b/internal/incident/incident.go index e8692b00..002cad06 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -7,24 +7,24 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" - "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" + "github.com/icinga/icinga-notifications/pkg/plugin" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "net/url" "sync" "time" ) -type ruleID = int64 type escalationID = int64 type Incident struct { - Id int64 `db:"id"` + ID int64 `db:"id"` ObjectID types.Binary `db:"object_id"` StartedAt types.UnixMilli `db:"started_at"` RecoveredAt types.UnixMilli `db:"recovered_at"` @@ -33,7 +33,6 @@ type Incident struct { Object *object.Object `db:"-"` EscalationState map[escalationID]*EscalationState `db:"-"` - Rules map[ruleID]struct{} `db:"-"` Recipients map[recipient.Key]*RecipientState `db:"-"` // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. @@ -52,6 +51,10 @@ type Incident struct { logger *zap.SugaredLogger runtimeConfig *config.RuntimeConfig + // config.Evaluable encapsulates all evaluable configuration types, such as rule.Rule, rule.Entry etc. + // It is embedded to enable direct access to its members. + *config.Evaluable + sync.Mutex } @@ -63,8 +66,8 @@ func NewIncident( Object: obj, logger: logger, runtimeConfig: runtimeConfig, + Evaluable: config.NewEvaluable(), EscalationState: map[escalationID]*EscalationState{}, - Rules: map[ruleID]struct{}{}, Recipients: map[recipient.Key]*RecipientState{}, } @@ -75,20 +78,8 @@ func NewIncident( return i } -func (i *Incident) IncidentObject() *object.Object { - return i.Object -} - -func (i *Incident) SeverityString() string { - return i.Severity.String() -} - func (i *Incident) String() string { - return fmt.Sprintf("#%d", i.Id) -} - -func (i *Incident) ID() int64 { - return i.Id + return fmt.Sprintf("#%d", i.ID) } func (i *Incident) HasManager() bool { @@ -176,20 +167,30 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - // Check if any (additional) rules match this object. Filters of rules that already have a state don't have - // to be checked again, these rules already matched and stay effective for the ongoing incident. - err = i.evaluateRules(ctx, tx, ev.ID) + // Check if any (additional) rules match this object. Incident filter rules are stateful, which means that + // once they have been matched, they remain effective for the ongoing incident and never need to be rechecked. + err := i.EvaluateRules(i.runtimeConfig, i.Object, config.EvalOptions[*rule.Rule, any]{ + OnPreEvaluate: func(r *rule.Rule) bool { return true }, // This might change in the future! + OnFilterMatch: func(r *rule.Rule) error { return i.onFilterRuleMatch(ctx, r, tx, ev) }, + OnError: func(r *rule.Rule, err error) bool { + i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) + + // We don't want to stop evaluating the remaining rules just because one of them couldn't be evaluated. + return true + }, + }) if err != nil { return err } + // Reset the evaluated escalations when leaving this function while holding the incident lock, + // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. + defer func() { i.RuleEntries = make(map[int64]*rule.Escalation) }() + // Re-evaluate escalations based on the newly evaluated rules. - escalations, err := i.evaluateEscalations(ev.Time) - if err != nil { - return err - } + i.evaluateEscalations(ev.Time) - if err := i.triggerEscalations(ctx, tx, ev, escalations); err != nil { + if err := i.triggerEscalations(ctx, tx, ev); err != nil { return err } case event.TypeAcknowledgementSet: @@ -239,20 +240,19 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return } - escalations, err := i.evaluateEscalations(ev.Time) - if err != nil { - i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) - return - } + // Reset the evaluated escalations when leaving this function while holding the incident lock, + // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. + defer func() { i.RuleEntries = make(map[int64]*rule.Escalation) }() - if len(escalations) == 0 { + i.evaluateEscalations(ev.Time) + if len(i.RuleEntries) == 0 { i.logger.Debug("Reevaluated escalations, no new escalations triggered") return } var notifications []*NotificationEntry ctx := context.Background() - err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { + err := utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { err := ev.Sync(ctx, tx, i.db, i.Object.ID) if err != nil { return err @@ -262,12 +262,12 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return fmt.Errorf("cannot insert incident event to the database: %w", err) } - if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil { + if err = i.triggerEscalations(ctx, tx, ev); err != nil { return err } channels := make(rule.ContactChannels) - for _, escalation := range escalations { + for _, escalation := range i.RuleEntries { channels.LoadFromEscalationRecipients(escalation, ev.Time, i.isRecipientNotifiable) } @@ -290,14 +290,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, oldSeverity := i.Severity newSeverity := ev.Severity if oldSeverity == newSeverity { - err := fmt.Errorf("%w: %s state event from source %d", event.ErrSuperfluousStateChange, ev.Severity.String(), ev.SourceId) - return err + i.logger.Debugw("Ignoring superfluous severity change event", zap.Int64("source_id", ev.SourceId), zap.Stringer("event", ev)) + return event.ErrSuperfluousStateChange } i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now()), Type: IncidentSeverityChanged, @@ -318,7 +318,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, RemoveCurrent(i.Object) hr = &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, EventID: utils.ToDBInt(ev.ID), Time: i.RecoveredAt, Type: Closed, @@ -354,7 +354,7 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, Type: Opened, Time: types.UnixMilli(ev.Time), EventID: utils.ToDBInt(ev.ID), @@ -377,7 +377,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. return nil } - hr := &HistoryRow{IncidentID: i.Id, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} + hr := &HistoryRow{IncidentID: i.ID, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} logger := i.logger.With(zap.String("event", ev.String())) if i.Object.IsMuted() { hr.Type = Muted @@ -395,54 +395,38 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. return hr.Sync(ctx, i.db, tx) } -// evaluateRules evaluates all the configured rules for this *incident.Object and -// generates history entries for each matched rule. -// Returns error on database failure. -func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64) error { - if i.Rules == nil { - i.Rules = make(map[int64]struct{}) - } - - for _, r := range i.runtimeConfig.Rules { - if _, ok := i.Rules[r.ID]; !ok { - matched, err := r.Eval(i.Object) - if err != nil { - i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) - } - - if err != nil || !matched { - continue - } - - i.Rules[r.ID] = struct{}{} - i.logger.Infow("Rule matches", zap.Object("rule", r)) +// onFilterRuleMatch records a database entry in the `incident_rule` table that refers to the specified rule.Rule. +// In addition, it generates a RuleMatched Incident History and synchronises it with the database. +// +// This function should only be used as an OnFilterMatch handler that is passed to the Evaluable#EvaluateRules +// function, which only fires when the event rule filter matches on the current Incident Object. +// +// Returns an error if it fails to persist the database entries. +func (i *Incident) onFilterRuleMatch(ctx context.Context, r *rule.Rule, tx *sqlx.Tx, ev *event.Event) error { + i.logger.Infow("Rule matches", zap.Object("rule", r)) - err = i.AddRuleMatched(ctx, tx, r) - if err != nil { - i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) - return err - } + if err := i.AddRuleMatched(ctx, tx, r); err != nil { + i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) + return err + } - hr := &HistoryRow{ - IncidentID: i.Id, - Time: types.UnixMilli(time.Now()), - EventID: utils.ToDBInt(eventID), - RuleID: utils.ToDBInt(r.ID), - Type: RuleMatched, - } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) - return err - } - } + hr := &HistoryRow{ + IncidentID: i.ID, + Time: types.UnixMilli(time.Now()), + EventID: utils.ToDBInt(ev.ID), + RuleID: utils.ToDBInt(r.ID), + Type: RuleMatched, + } + if err := hr.Sync(ctx, i.db, tx); err != nil { + i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) + return err } return nil } // evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. -// Returns the newly evaluated escalations to be triggered or an error on database failure. -func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) { +func (i *Incident) evaluateEscalations(eventTime time.Time) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } @@ -457,61 +441,46 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity} - var escalations []*rule.Escalation - retryAfter := rule.RetryNever - - for rID := range i.Rules { - r := i.runtimeConfig.Rules[rID] - if r == nil { - i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) - continue - } - - // Check if new escalation stages are reached - for _, escalation := range r.Escalations { - if _, ok := i.EscalationState[escalation.ID]; !ok { - matched, err := escalation.Eval(filterContext) - if err != nil { - i.logger.Warnw( - "Failed to evaluate escalation condition", zap.Object("rule", r), - zap.Object("escalation", escalation), zap.Error(err), - ) - } else if !matched { - incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition) - retryAfter = min(retryAfter, incidentAgeFilter) - } else { - escalations = append(escalations, escalation) - } + // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return an + // error nor false, we can safely discard the return value here. + _ = i.EvaluateRuleEntries(i.runtimeConfig, filterContext, config.EvalOptions[*rule.Escalation, any]{ + // Prevent reevaluation of an already triggered escalation via the pre run hook. + OnPreEvaluate: func(escalation *rule.Escalation) bool { return i.EscalationState[escalation.ID] == nil }, + OnError: func(escalation *rule.Escalation, err error) bool { + r := i.runtimeConfig.Rules[escalation.RuleID] + i.logger.Warnw("Failed to evaluate escalation condition", zap.Object("rule", r), + zap.Object("escalation", escalation), zap.Error(err)) + return true + }, + OnAllConfigEvaluated: func(result any) { + retryAfter := result.(time.Duration) + if retryAfter != rule.RetryNever { + // The retryAfter duration is relative to the incident duration represented by the escalation filter, + // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter + // would contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of + // the incident start time here. + nextEvalAt := eventTime.Add(retryAfter) + + i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.timer = time.AfterFunc(retryAfter, func() { + i.logger.Info("Reevaluating escalations") + + i.RetriggerEscalations(&event.Event{ + Time: nextEvalAt, + Type: event.TypeIncidentAge, + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), + }) + }) } - } - } - - if retryAfter != rule.RetryNever { - // The retryAfter duration is relative to the incident duration represented by the escalation filter, - // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would - // contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident - // start time here. - nextEvalAt := eventTime.Add(retryAfter) - - i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) - i.timer = time.AfterFunc(retryAfter, func() { - i.logger.Info("Reevaluating escalations") - - i.RetriggerEscalations(&event.Event{ - Time: nextEvalAt, - Type: event.TypeIncidentAge, - Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), - }) - }) - } - - return escalations, nil + }, + }) } // triggerEscalations triggers the given escalations and generates incident history items for each of them. // Returns an error on database failure. -func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Escalation) error { - for _, escalation := range escalations { +func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { + for _, escalation := range i.RuleEntries { r := i.runtimeConfig.Rules[escalation.RuleID] if r == nil { i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) @@ -532,7 +501,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even } hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, Time: state.TriggeredAt, EventID: utils.ToDBInt(ev.ID), RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), @@ -559,6 +528,35 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even // notifyContacts executes all the given pending notifications of the current incident. // Returns error on database failure or if the provided context is cancelled. func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { + baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) + if err != nil { + i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) + return err + } + + incidentUrl := baseUrl.JoinPath("/notifications/incident") + incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID) + + req := &plugin.NotificationRequest{ + Object: &plugin.Object{ + Name: i.Object.DisplayName(), + Url: ev.URL, + Tags: i.Object.Tags, + ExtraTags: i.Object.ExtraTags, + }, + Incident: &plugin.Incident{ + Id: i.ID, + Url: incidentUrl.String(), + Severity: i.Severity.String(), + }, + Event: &plugin.Event{ + Time: ev.Time, + Type: ev.Type, + Username: ev.Username, + Message: ev.Message, + }, + } + for _, notification := range notifications { contact := i.runtimeConfig.Contacts[notification.ContactID] if contact == nil { @@ -566,7 +564,7 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica continue } - if i.notifyContact(contact, ev, notification.ChannelID) != nil { + if i.notifyContact(contact, req, notification.ChannelID) != nil { notification.State = NotificationStateFailed } else { notification.State = NotificationStateSent @@ -590,7 +588,7 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica } // notifyContact notifies the given recipient via a channel matching the given ID. -func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, chID int64) error { +func (i *Incident) notifyContact(contact *recipient.Contact, req *plugin.NotificationRequest, chID int64) error { ch := i.runtimeConfig.Channels[chID] if ch == nil { i.logger.Errorw("Could not find config for channel", zap.Int64("channel_id", chID)) @@ -599,16 +597,21 @@ func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, ch } i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), - zap.Int64("channel_id", chID), zap.String("event_type", ev.Type)) + zap.Int64("channel_id", chID), zap.String("event_type", req.Event.Type)) - err := ch.Notify(contact, i, ev, daemon.Config().Icingaweb2URL) - if err != nil { + contactStruct := &plugin.Contact{FullName: contact.FullName} + for _, addr := range contact.Addresses { + contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) + } + req.Contact = contactStruct + + if err := ch.Notify(req); err != nil { i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) return err } i.logger.Infow("Successfully sent a notification via channel plugin", zap.String("type", ch.Type), - zap.String("contact", contact.FullName), zap.String("event_type", ev.Type)) + zap.String("contact", contact.FullName), zap.String("event_type", req.Event.Type)) return nil } @@ -647,7 +650,7 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, i.logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String()) hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, Key: recipientKey, EventID: utils.ToDBInt(ev.ID), Type: RecipientRoleChanged, @@ -725,7 +728,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { func (i *Incident) restoreRecipients(ctx context.Context) error { contact := &ContactRow{} var contacts []*ContactRow - err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) + err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID) if err != nil { i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) return err @@ -767,7 +770,3 @@ func (e *EscalationState) TableName() string { type RecipientState struct { Role ContactRole } - -var ( - _ contracts.Incident = (*Incident)(nil) -) diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index e35e4f34..56edfdc8 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -80,11 +80,11 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log incidentsByObjId := make(map[string]*Incident, chunkLen) for _, i := range bulk { - incidentsById[i.Id] = i + incidentsById[i.ID] = i incidentsByObjId[i.ObjectID.String()] = i objectIds = append(objectIds, i.ObjectID) - incidentIds = append(incidentIds, i.Id) + incidentIds = append(incidentIds, i.ID) } // Restore all incident objects matching the given object ids @@ -103,7 +103,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID) if escalation != nil { - i.Rules[escalation.RuleID] = struct{}{} + i.Rules[escalation.RuleID] = true } }) if err != nil { @@ -192,7 +192,7 @@ func GetCurrentIncidents() map[int64]*Incident { m := make(map[int64]*Incident) for _, incident := range currentIncidents { - m[incident.Id] = incident + m[incident.ID] = incident } return m } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index 2cf4c0e0..674034e4 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -68,7 +68,7 @@ func TestLoadOpenIncidents(t *testing.T) { RemoveCurrent(i.Object) // Mark some of the existing incidents as recovered. - if i.Id%20 == 0 { // 1000 / 20 => 50 existing incidents will be marked as recovered! + if i.ID%20 == 0 { // 1000 / 20 => 50 existing incidents will be marked as recovered! i.RecoveredAt = types.UnixMilli(time.Now()) require.NoError(t, i.Sync(ctx, tx), "failed to update/insert incident") @@ -125,7 +125,7 @@ func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testDat assert.NotNil(t, current.Object, "failed to restore incident object") if i != nil { - assert.Equal(t, i.Id, current.Id, "incidents linked to the same object don't have the same ID") + assert.Equal(t, i.ID, current.ID, "incidents linked to the same object don't have the same ID") assert.Equal(t, i.Severity, current.Severity, "failed to restore incident severity") assert.Equal(t, i.StartedAt, current.StartedAt, "failed to restore incident started at") assert.Equal(t, i.RecoveredAt, current.RecoveredAt, "failed to restore incident recovered at") diff --git a/internal/incident/sync.go b/internal/incident/sync.go index c54fe61d..f712a029 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -25,7 +25,7 @@ func (i *Incident) Upsert() interface{} { // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { - if i.Id != 0 { + if i.ID != 0 { stmt, _ := i.db.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { @@ -38,14 +38,14 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { return err } - i.Id = incidentId + i.ID = incidentId } return nil } func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { - state.IncidentID = i.Id + state.IncidentID = i.ID stmt, _ := i.db.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) @@ -55,7 +55,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat // AddEvent Inserts incident history record to the database and returns an error on db failure. func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - ie := &EventRow{IncidentID: i.Id, EventID: ev.ID} + ie := &EventRow{IncidentID: i.ID, EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) _, err := tx.NamedExecContext(ctx, stmt, ie) @@ -72,7 +72,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru for _, escalationRecipient := range escalation.Recipients { r := escalationRecipient.Recipient - cr := &ContactRow{IncidentID: i.Id, Role: newRole} + cr := &ContactRow{IncidentID: i.ID, Role: newRole} recipientKey := recipient.ToKey(r) cr.Key = recipientKey @@ -88,7 +88,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, EventID: utils.ToDBInt(eventId), Key: cr.Key, Time: types.UnixMilli(time.Now()), @@ -125,7 +125,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // AddRuleMatched syncs the given *rule.Rule to the database. // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { - rr := &RuleRow{IncidentID: i.Id, RuleID: r.ID} + rr := &RuleRow{IncidentID: i.ID, RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) @@ -145,7 +145,7 @@ func (i *Incident) generateNotifications( for contact, channels := range contactChannels { for chID := range channels { hr := &HistoryRow{ - IncidentID: i.Id, + IncidentID: i.ID, Key: recipient.ToKey(contact), EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now()), diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 40e14a17..66ce4e76 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -65,7 +65,7 @@ func (e *Escalation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { // Eval evaluates the configured escalation filter for the provided filter. // Returns always true if there are no configured escalation conditions. -func (e *Escalation) Eval(filterable *EscalationFilter) (bool, error) { +func (e *Escalation) Eval(filterable filter.Filterable) (bool, error) { if e.Condition == nil { return true, nil } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 9e28237d..9b5c1770 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -312,7 +312,11 @@ func FormatMessage(writer io.Writer, req *NotificationRequest) { }) } - _, _ = fmt.Fprintf(writer, "\nIncident: %s", req.Incident.Url) + if req.Incident != nil { + _, _ = fmt.Fprintf(writer, "\nIncident: %s", req.Incident.Url) + } else { + _, _ = fmt.Fprint(writer, "\nIncident: No active incident found for this object") + } } // FormatSubject returns the formatted subject string based on the event type. @@ -321,8 +325,16 @@ func FormatSubject(req *NotificationRequest) string { case event.TypeState: return fmt.Sprintf("[#%d] %s %s is %s", req.Incident.Id, req.Event.Type, req.Object.Name, req.Incident.Severity) case event.TypeAcknowledgementCleared, event.TypeDowntimeRemoved: - return fmt.Sprintf("[#%d] %s from %s", req.Incident.Id, req.Event.Type, req.Object.Name) + if req.Incident != nil { + return fmt.Sprintf("[#%d] %s from %s", req.Incident.Id, req.Event.Type, req.Object.Name) + } + + return fmt.Sprintf("%s from %s", req.Event.Type, req.Object.Name) default: - return fmt.Sprintf("[#%d] %s on %s", req.Incident.Id, req.Event.Type, req.Object.Name) + if req.Incident != nil { + return fmt.Sprintf("[#%d] %s on %s", req.Incident.Id, req.Event.Type, req.Object.Name) + } + + return fmt.Sprintf("%s on %s", req.Event.Type, req.Object.Name) } }