Skip to content

Commit

Permalink
fix(aggregate-v2): Update provider model to match API schema
Browse files Browse the repository at this point in the history
Brings the aggregate-v2 model in the provider inline with the schema
that has evolved from work in the API schema. Includes some refactoring
to break apart the large FromModel conversion function into smaller
parts.

Ref: LOG-19284
  • Loading branch information
dhable committed Feb 28, 2024
1 parent 9295d06 commit 9cd4b9f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 31 deletions.
113 changes: 82 additions & 31 deletions internal/provider/models/processors/aggregate_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
const AGGREGATE_PROCESSOR_NODE_NAME = "aggregate-v2"
const AGGREGATE_PROCESSOR_TYPE_NAME = "aggregate_v2"

var METHODS = map[string]string{"TUMBLING": "tumbling", "SLIDING": "sliding"}
var STRATEGIES = map[string]string{
"sum": "SUM",
"average": "AVG",
Expand All @@ -30,9 +29,10 @@ type AggregateV2ProcessorModel struct {
Inputs List `tfsdk:"inputs"`
GenerationId Int64 `tfsdk:"generation_id"`
Method String `tfsdk:"method" user_config:"true"`
IntervalMS Int64 `tfsdk:"interval" user_config:"true"`
Interval Int64 `tfsdk:"interval" user_config:"true"`
Strategy String `tfsdk:"strategy" user_config:"true"`
Duration Int64 `tfsdk:"window_duration" user_config:"true"`
Minimum Int64 `tfsdk:"window_min" user_config:"true"`
Conditional Object `tfsdk:"conditional" user_config:"true"`
GroupBy basetypes.ListValue `tfsdk:"group_by" user_config:"true"`
Script String `tfsdk:"script" user_config:"true"`
Expand All @@ -44,6 +44,7 @@ var AggregateV2ProcessorResourceSchema = schema.Schema{
"method": schema.StringAttribute{
Required: true,
Description: "The method in which to aggregate metrics (tumbling or sliding)",
Validators: []validator.String{stringvalidator.OneOf("tumbling", "sliding")},
},
"interval": schema.Int64Attribute{
Optional: true,
Expand All @@ -65,6 +66,11 @@ var AggregateV2ProcessorResourceSchema = schema.Schema{
Computed: true,
Description: "When method is set to sliding, this is the interval over which metrics are aggregated in seconds",
},
"window_min": schema.Int64Attribute{
Optional: true,
Computed: true,
Description: "",
},
"conditional": schema.SingleNestedAttribute{
Optional: true,
Description: "When method is set to sliding: " + ParentConditionalAttribute.Description,
Expand All @@ -79,33 +85,54 @@ var AggregateV2ProcessorResourceSchema = schema.Schema{
}),
}

func AggregateV2ProcessorFromModel(plan *AggregateV2ProcessorModel, previousState *AggregateV2ProcessorModel) (*Processor, diag.Diagnostics) {
dd := diag.Diagnostics{}
component := Processor{
BaseNode: BaseNode{
Type: AGGREGATE_PROCESSOR_NODE_NAME,
Title: plan.Title.ValueString(),
Description: plan.Description.ValueString(),
UserConfig: make(map[string]any),
},
}

if previousState != nil {
component.Id = previousState.Id.ValueString()
component.GenerationId = previousState.GenerationId.ValueInt64()
}
func methodConfigFromModel(plan *AggregateV2ProcessorModel, userConfig map[string]any, dd *diag.Diagnostics) {
isIntervalSet := !plan.Interval.IsNull() && !plan.Interval.IsUnknown()
isDurationSet := !plan.Duration.IsNull() && !plan.Duration.IsUnknown()
isMinimumSet := !plan.Minimum.IsNull() && !plan.Minimum.IsUnknown()
method := plan.Method.ValueString()

userConfig["method"] = method
if method == "tumbling" {
if isDurationSet {
dd.AddError(
"Error in plan",
"The field 'window_duration' can only be set if method == 'sliding'",
)
}

component.Inputs = StringListValueToStringSlice(plan.Inputs)
user_config := component.UserConfig
if isMinimumSet {
dd.AddError(
"Error in plan",
"The field 'window_min' can only be set if method == 'sliding'",
)
}

if !plan.Method.IsNull() {
user_config["method"] = plan.Method.ValueString()
}
if isIntervalSet {
userConfig["interval"] = plan.Interval.ValueInt64()
}
} else if method == "sliding" {
if isIntervalSet {
dd.AddError(
"Error in plan",
"The field 'interval' can only be set if method == 'tumbling'",
)
}

if !plan.IntervalMS.IsNull() {
user_config["interval"] = plan.IntervalMS.ValueInt64()
if isDurationSet {
userConfig["window_duration"] = plan.Duration.ValueInt64()
}
if isMinimumSet {
userConfig["window_min"] = plan.Minimum.ValueInt64()
}
} else {
dd.AddError(
"Error in plan",
"The method '%s' is not handled correctly in the provider. Please open a GitHub issue to report this.",
)
}
}

func strategyConfigFromModel(plan *AggregateV2ProcessorModel, userConfig map[string]any, dd *diag.Diagnostics) {
strategySet := !(plan.Strategy.IsNull() || plan.Strategy.IsUnknown())
scripSet := !(plan.Script.IsNull() || plan.Script.IsNull())
if !strategySet && !scripSet {
Expand All @@ -119,17 +146,37 @@ func AggregateV2ProcessorFromModel(plan *AggregateV2ProcessorModel, previousStat
"Cannot define both 'strategy' and 'script' fields.",
)
} else if scripSet {
user_config["strategy"] = "CUSTOM"
user_config["script"] = plan.Script.ValueString()
userConfig["strategy"] = "CUSTOM"
userConfig["script"] = plan.Script.ValueString()
} else {
delete(user_config, "script")
user_config["strategy"] = STRATEGIES[plan.Strategy.ValueString()]
delete(userConfig, "script")
userConfig["strategy"] = STRATEGIES[plan.Strategy.ValueString()]
}

if !plan.Duration.IsNull() {
user_config["window_duration"] = plan.Duration.ValueInt64()
}

func AggregateV2ProcessorFromModel(plan *AggregateV2ProcessorModel, previousState *AggregateV2ProcessorModel) (*Processor, diag.Diagnostics) {
dd := diag.Diagnostics{}
component := Processor{
BaseNode: BaseNode{
Type: AGGREGATE_PROCESSOR_NODE_NAME,
Title: plan.Title.ValueString(),
Description: plan.Description.ValueString(),
UserConfig: make(map[string]any),
},
}

if previousState != nil {
component.Id = previousState.Id.ValueString()
component.GenerationId = previousState.GenerationId.ValueInt64()
}

component.Inputs = StringListValueToStringSlice(plan.Inputs)
user_config := component.UserConfig

methodConfigFromModel(plan, user_config, &dd)
strategyConfigFromModel(plan, user_config, &dd)

if !plan.Conditional.IsNull() {
user_config["conditional"] = unwindConditionalFromModel(plan.Conditional)
}
Expand All @@ -154,7 +201,7 @@ func AggregateV2ProcessorToModel(plan *AggregateV2ProcessorModel, component *Pro
plan.Method = basetypes.NewStringValue(component.UserConfig["method"].(string))

if component.UserConfig["interval"] != nil {
plan.IntervalMS = Int64Value(int64(component.UserConfig["interval"].(float64)))
plan.Interval = Int64Value(int64(component.UserConfig["interval"].(float64)))
}

if component.UserConfig["strategy"] != nil {
Expand All @@ -172,6 +219,10 @@ func AggregateV2ProcessorToModel(plan *AggregateV2ProcessorModel, component *Pro
plan.Duration = Int64Value(int64(component.UserConfig["window_duration"].(float64)))
}

if component.UserConfig["window_min"] != nil {
plan.Minimum = Int64Value(int64(component.UserConfig["window_min"].(float64)))
}

if component.UserConfig["conditional"] != nil {
plan.Conditional = UnwindConditionalToModel(component.UserConfig["conditional"].(map[string]any))
}
Expand Down
43 changes: 43 additions & 0 deletions internal/provider/models/processors/test/aggregate_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,49 @@ func TestAggregateV2Processor(t *testing.T) {
}`,
ExpectError: regexp.MustCompile("/window_duration/maximum"),
},

// Error: window duration
{
Config: GetCachedConfig(cacheKey) + `
resource "mezmo_aggregate_v2_processor" "my_processor" {
pipeline_id = mezmo_pipeline.test_parent.id
inputs = [mezmo_http_source.my_source.id]
strategy = "sum"
method = "tumbling"
interval = 10
window_duration = 5
}`,
ExpectError: regexp.MustCompile("The field 'window_duration' can only be set if method == 'sliding'"),
},

// Error: window min
{
Config: GetCachedConfig(cacheKey) + `
resource "mezmo_aggregate_v2_processor" "my_processor" {
pipeline_id = mezmo_pipeline.test_parent.id
inputs = [mezmo_http_source.my_source.id]
strategy = "sum"
method = "tumbling"
interval = 10
window_min = 5
}`,
ExpectError: regexp.MustCompile("The field 'window_min' can only be set if method == 'sliding'"),
},

// Error: interval
{
Config: GetCachedConfig(cacheKey) + `
resource "mezmo_aggregate_v2_processor" "my_processor" {
pipeline_id = mezmo_pipeline.test_parent.id
inputs = [mezmo_http_source.my_source.id]
strategy = "sum"
method = "sliding"
interval = 10
window_duration = 5
}`,
ExpectError: regexp.MustCompile("The field 'interval' can only be set if method == 'tumbling'"),
},

// confirm manually deleted resources are recreated
{
Config: GetProviderConfig() + `
Expand Down

0 comments on commit 9cd4b9f

Please sign in to comment.