Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#487 request size metric added #662

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
SecondsBucketsDetailedNano = prometheus.ExponentialBuckets(0.000005, 2, 19) // covers range from 5ns to 1.3ms
SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s
SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s
RequestsSize = prometheus.ExponentialBuckets(100, 10, 5) // covers range from 100B to 1MB
)

type Ctl struct {
Expand Down
31 changes: 30 additions & 1 deletion plugin/input/http/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"bytes"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -110,6 +111,7 @@ type Plugin struct {
bulkRequestsDoneTotal prometheus.Counter
requestsInProgress prometheus.Gauge
processBulkSeconds prometheus.Observer
requestsSize prometheus.Observer

metaTemplater *metadata.MetaTemplater
}
Expand Down Expand Up @@ -304,10 +306,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
}

func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "")
p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "Total http bulk requests done")
p.requestsInProgress = ctl.RegisterGauge("requests_in_progress", "")
p.processBulkSeconds = ctl.RegisterHistogram("process_bulk_seconds", "", metric.SecondsBucketsDetailed)
p.errorsTotal = ctl.RegisterCounter("input_http_errors", "Total http errors")
p.requestsSize = ctl.RegisterHistogram("requests_size", "", metric.RequestsSize)

if p.config.Auth.Strategy_ != StrategyDisabled {
httpAuthTotal := ctl.RegisterCounterVec("http_auth_success_total", "", "secret_name")
Expand Down Expand Up @@ -475,6 +478,8 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata.MetaData) {
var err error

if r.Method != http.MethodPost {
http.Error(w, "", http.StatusMethodNotAllowed)
return
Expand All @@ -483,6 +488,14 @@ func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata
start := time.Now()
p.requestsInProgress.Inc()

r.Body, err = p.calculateRequestSize(r.Body)
if err != nil {
p.errorsTotal.Inc()
p.logger.Error("can't read body", zap.Error(err))
http.Error(w, "can't read body", http.StatusBadRequest)
return
}

reader := io.Reader(r.Body)
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := p.acquireGzipReader(reader)
Expand Down Expand Up @@ -642,6 +655,22 @@ func (p *Plugin) putGzipReader(reader *gzip.Reader) {
p.gzipReaderPool.Put(reader)
}

func (p *Plugin) calculateRequestSize(reqBody io.Reader) (io.ReadCloser, error) {
bodyBytes, err := io.ReadAll(reqBody)
if err != nil {
return nil, err
}

bodySize := len(bodyBytes)
p.requestsSize.Observe(float64(bodySize))

return io.NopCloser(bytes.NewBuffer(bodyBytes)), nil
}

func resetRequestBody() {

}

func getUserIP(r *http.Request) net.IP {
var userIP string
switch {
Expand Down