diff --git a/metric/controller.go b/metric/controller.go index 36964bb39..de99d8eec 100644 --- a/metric/controller.go +++ b/metric/controller.go @@ -14,6 +14,7 @@ var ( SecondsBucketsDetailedNano = prometheus.ExponentialBuckets(0.000005, 2, 19) // covers range from 5ns to 1.3ms SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s + RequestsSize = prometheus.ExponentialBuckets(100, 10, 5) // covers range from 100B to 1MB ) type Ctl struct { diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index bb65825ef..ac7a22e42 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -1,6 +1,7 @@ package http import ( + "bytes" "fmt" "io" "net" @@ -110,6 +111,7 @@ type Plugin struct { bulkRequestsDoneTotal prometheus.Counter requestsInProgress prometheus.Gauge processBulkSeconds prometheus.Observer + requestsSize prometheus.Observer metaTemplater *metadata.MetaTemplater } @@ -304,10 +306,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { - p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "") + p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "Total http bulk requests done") p.requestsInProgress = ctl.RegisterGauge("requests_in_progress", "") p.processBulkSeconds = ctl.RegisterHistogram("process_bulk_seconds", "", metric.SecondsBucketsDetailed) p.errorsTotal = ctl.RegisterCounter("input_http_errors", "Total http errors") + p.requestsSize = ctl.RegisterHistogram("requests_size", "", metric.RequestsSize) if p.config.Auth.Strategy_ != StrategyDisabled { httpAuthTotal := ctl.RegisterCounterVec("http_auth_success_total", "", "secret_name") @@ -475,6 +478,8 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata.MetaData) { + var err error + if r.Method != http.MethodPost { http.Error(w, "", http.StatusMethodNotAllowed) return @@ -483,6 +488,14 @@ func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata start := time.Now() p.requestsInProgress.Inc() + r.Body, err = p.calculateRequestSize(r.Body) + if err != nil { + p.errorsTotal.Inc() + p.logger.Error("can't read body", zap.Error(err)) + http.Error(w, "can't read body", http.StatusBadRequest) + return + } + reader := io.Reader(r.Body) if r.Header.Get("Content-Encoding") == "gzip" { zr, err := p.acquireGzipReader(reader) @@ -642,6 +655,22 @@ func (p *Plugin) putGzipReader(reader *gzip.Reader) { p.gzipReaderPool.Put(reader) } +func (p *Plugin) calculateRequestSize(reqBody io.Reader) (io.ReadCloser, error) { + bodyBytes, err := io.ReadAll(reqBody) + if err != nil { + return nil, err + } + + bodySize := len(bodyBytes) + p.requestsSize.Observe(float64(bodySize)) + + return io.NopCloser(bytes.NewBuffer(bodyBytes)), nil +} + +func resetRequestBody() { + +} + func getUserIP(r *http.Request) net.IP { var userIP string switch {