Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interval processor #1119

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ Be sure perform actual integration testing in a live environment in the main [k8
|-----|------|---------|-------------|
| processors.grafanaCloudMetrics.enabled | bool | `true` | Generate host info metrics from telemetry data, used in Application Observability in Grafana Cloud. |

### Processors: Interval

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| processors.interval.enabled | bool | `false` | Utilize an interval processor to aggregates metrics and periodically forwards the latest values to the next component in the pipeline. |
| processors.interval.interval | string | `"60s"` | The interval at which to emit aggregated metrics. |
| processors.interval.passthrough.gauge | bool | `false` | Determines whether gauge metrics should be passed through as they are or aggregated. |
| processors.interval.passthrough.summary | bool | `false` | Determines whether summary metrics should be passed through as they are or aggregated. |

### Processors: K8s Attributes

| Key | Type | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{{- $transform := include "feature.applicationObservability.processor.transform.alloy.target" dict }}
{{- $filter := include "feature.applicationObservability.processor.filter.alloy.target" dict }}
{{- $batch := include "feature.applicationObservability.processor.batch.alloy.target" dict }}
{{- $interval := include "feature.applicationObservability.processor.interval.alloy.target" dict }}
{{- $memoryLimiter := include "feature.applicationObservability.processor.memory_limiter.alloy.target" dict }}
declare "application_observability" {
argument "metrics_destinations" {
Expand Down Expand Up @@ -66,6 +67,11 @@ declare "application_observability" {
{{- $metricsNext = printf "[%s]" $memoryLimiter }}
{{- $logsNext = printf "[%s]" $memoryLimiter }}
{{- $tracesNext = printf "[%s]" $memoryLimiter }}
{{- else if .Values.processors.interval.enabled }}
// Batch Processor --> Interval
{{- $metricsNext = printf "[%s]" $interval }}
{{- $logsNext = printf "[%s]" $interval }}
{{- $tracesNext = printf "[%s]" $interval }}
{{- else }}
// Batch Processor --> Destinations
{{- $metricsNext = "argument.metrics_destinations.value" }}
Expand All @@ -75,12 +81,27 @@ declare "application_observability" {
{{- include "feature.applicationObservability.processor.batch.alloy" (dict "Values" $.Values "metricsOutput" $metricsNext "logsOutput" $logsNext "tracesOutput" $tracesNext ) | indent 2 }}

{{- if .Values.processors.memoryLimiter.enabled }}
{{- if .Values.processors.interval.enabled }}
// Memory Limiter --> Interval
{{- $metricsNext = printf "[%s]" $interval }}
{{- $logsNext = printf "[%s]" $interval }}
{{- $tracesNext = printf "[%s]" $interval }}
{{- else }}
// Memory Limiter --> Destinations
{{- $metricsNext = "argument.metrics_destinations.value" }}
{{- $logsNext = "argument.logs_destinations.value" }}
{{- $tracesNext = "argument.traces_destinations.value" }}
{{- end }}
{{- include "feature.applicationObservability.processor.memory_limiter.alloy" (dict "Values" $.Values "metricsOutput" $metricsNext "logsOutput" $logsNext "tracesOutput" $tracesNext ) | indent 2 }}
{{- end }}

{{- if .Values.processors.interval.enabled }}
// Interval --> Destinations
{{- $metricsNext = "argument.metrics_destinations.value" }}
{{- $logsNext = "argument.logs_destinations.value" }}
{{- $tracesNext = "argument.traces_destinations.value" }}
{{- include "feature.applicationObservability.processor.interval.alloy" (dict "Values" $.Values "metricsOutput" $metricsNext "logsOutput" $logsNext "tracesOutput" $tracesNext ) | indent 2 }}
{{- end }}
}
{{- end }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
{{- define "feature.applicationObservability.processor.batch.alloy.target" }}otelcol.processor.batch.{{ .name | default "default" }}.input{{ end }}
{{- define "feature.applicationObservability.processor.batch.alloy" }}
otelcol.processor.batch {{ .name | default "default" | quote }} {
send_batch_size = {{ .Values.processors.batch.size }}
send_batch_max_size = {{ .Values.processors.batch.maxSize }}
timeout = {{ .Values.processors.batch.timeout | quote}}

output {
{{- if and .metricsOutput .Values.metrics.enabled }}
metrics = {{ .metricsOutput }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{/* Inputs: Values (values) metricsOutput, logsOutput, tracesOutput, name */}}
{{- define "feature.applicationObservability.processor.interval.alloy.target" }}otelcol.processor.interval.{{ .name | default "default" }}.input{{ end }}
{{- define "feature.applicationObservability.processor.interval.alloy" }}
otelcol.processor.interval {{ .name | default "default" | quote }} {
interval = {{ .Values.processors.interval.interval | quote }}
passthrough {
gauge = {{ .Values.processors.interval.passthrough.gauge }}
summary = {{ .Values.processors.interval.passthrough.summary }}
}

output {
{{- if and .metricsOutput .Values.metrics.enabled }}
metrics = {{ .metricsOutput }}
{{- end }}
{{- if and .logsOutput .Values.logs.enabled }}
logs = {{ .logsOutput }}
{{- end }}
{{- if and .tracesOutput .Values.traces.enabled }}
traces = {{ .tracesOutput }}
{{- end }}
}
}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ tests:

// Batch Processor --> Destinations
otelcol.processor.batch "default" {
send_batch_size = 16384
send_batch_max_size = 0
timeout = "2s"

output {
metrics = argument.metrics_destinations.value
logs = argument.logs_destinations.value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# yamllint disable rule:document-start rule:line-length rule:trailing-spaces
suite: Test with interval processor
templates:
- configmap.yaml
tests:
- it: creates the pipeline with the interval processor
set:
deployAsConfigMap: true
processors:
interval:
enabled: true
receivers:
otlp:
grpc:
enabled: true
http:
enabled: true
jaeger:
grpc:
enabled: true
thriftBinary:
enabled: true
thriftCompact:
enabled: true
thriftHttp:
enabled: true
zipkin:
enabled: true
asserts:
- isKind:
of: ConfigMap
- equal:
path: data["module.alloy"]
value: |-
declare "application_observability" {
argument "metrics_destinations" {
comment = "Must be a list of metrics destinations where collected metrics should be forwarded to"
}

argument "logs_destinations" {
comment = "Must be a list of log destinations where collected logs should be forwarded to"
}

argument "traces_destinations" {
comment = "Must be a list of trace destinations where collected trace should be forwarded to"
}

// Receivers --> Resource Detection Processor
otelcol.receiver.otlp "receiver" {
grpc {
endpoint = "0.0.0.0:4317"
}
http {
endpoint = "0.0.0.0:4318"
}
debug_metrics {
disable_high_cardinality_metrics = true
}
output {
metrics = [otelcol.processor.resourcedetection.default.input]
logs = [otelcol.processor.resourcedetection.default.input]
traces = [otelcol.processor.resourcedetection.default.input]
}
}
otelcol.receiver.jaeger "receiver" {
protocols {grpc {
endpoint = "0.0.0.0:0"
}
thrift_binary {
endpoint = "0.0.0.0:0"
}
thrift_compact {
endpoint = "0.0.0.0:0"
}
thrift_http {
endpoint = "0.0.0.0:0"
}
}

debug_metrics {
disable_high_cardinality_metrics = true
}
output {
traces = [otelcol.processor.resourcedetection.default.input]
}
}
otelcol.receiver.zipkin "receiver" {
endpoint = "0.0.0.0:9411"
debug_metrics {
disable_high_cardinality_metrics = true
}
output {
traces = [otelcol.processor.resourcedetection.default.input]
}
}

// Resource Detection Processor --> K8s Attribute Processor
otelcol.processor.resourcedetection "default" {
detectors = ["env", "system"]
system {
hostname_sources = ["os"]
}

output {
metrics = [otelcol.processor.k8sattributes.default.input]
logs = [otelcol.processor.k8sattributes.default.input]
traces = [otelcol.processor.k8sattributes.default.input]
}
}

// K8s Attribute Processor --> Transform Processor
// Resource Detection Processor Traces --> Host Info Connector
otelcol.processor.k8sattributes "default" {
extract {
metadata = ["k8s.namespace.name","k8s.pod.name","k8s.deployment.name","k8s.statefulset.name","k8s.daemonset.name","k8s.cronjob.name","k8s.job.name","k8s.node.name","k8s.pod.uid","k8s.pod.start_time"]
}
pod_association {
source {
from = "connection"
}
}

output {
metrics = [otelcol.processor.transform.default.input]
logs = [otelcol.processor.transform.default.input]
traces = [otelcol.processor.transform.default.input, otelcol.connector.host_info.default.input]
}
}
// Host Info Connector --> Batch Processor
otelcol.connector.host_info "default" {
host_identifiers = [ "k8s.node.name" ]

output {
metrics = [otelcol.processor.batch.default.input]
}
}


// Transform Processor --> Batch Processor
otelcol.processor.transform "default" {
error_mode = "ignore"
log_statements {
context = "resource"
statements = [
"set(attributes[\"pod\"], attributes[\"k8s.pod.name\"])",
"set(attributes[\"namespace\"], attributes[\"k8s.namespace.name\"])",
"set(attributes[\"loki.resource.labels\"], \"cluster, namespace, job, pod\")",
]
}

output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}

// Batch Processor --> Interval
otelcol.processor.batch "default" {
send_batch_size = 16384
send_batch_max_size = 0
timeout = "2s"

output {
metrics = [otelcol.processor.interval.default.input]
logs = [otelcol.processor.interval.default.input]
traces = [otelcol.processor.interval.default.input]
}
}
// Interval --> Destinations
otelcol.processor.interval "default" {
interval = "60s"
passthrough {
gauge = false
summary = false
}

output {
metrics = argument.metrics_destinations.value
logs = argument.logs_destinations.value
traces = argument.traces_destinations.value
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,28 @@
}
}
},
"interval": {
"type": "object",
"properties": {
"enabled": {
"type": "boolean"
},
"interval": {
"type": "string"
},
"passthrough": {
"type": "object",
"properties": {
"gauge": {
"type": "boolean"
},
"summary": {
"type": "boolean"
}
}
}
}
},
"k8sattributes": {
"type": "object",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ processors:
# @section -- Processors: Batch
timeout: 2s

interval:
# -- Utilize an interval processor to aggregates metrics and periodically forwards the latest values to the next
# component in the pipeline.
# @section -- Processors: Interval
enabled: false

# -- The interval at which to emit aggregated metrics.
# @section -- Processors: Interval
interval: 60s

passthrough:
# -- Determines whether gauge metrics should be passed through as they are or aggregated.
# @section -- Processors: Interval
gauge: false

# -- Determines whether summary metrics should be passed through as they are or aggregated.
# @section -- Processors: Interval
summary: false

k8sattributes:
# -- Kubernetes metadata to extract and add to the attributes of the received telemetry data.
# @section -- Processors: K8s Attributes
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading