Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Derive metrics for continuously growing counters #198

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions derive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package metrics

import (
"sync"
"time"
)

// Derive count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate. It expects always
// increasing values. The first call to Mark() initializes the base value. Any
// following calls update the value as difference to the base and update the
// base to the currently passed value. When the new value is smaller than the
// current one, the values are Clear()'d.
type Derive interface {
Count() int64
Base() int64
Mark(int64)
Rate1() float64
Rate5() float64
Rate15() float64
RateMean() float64
Snapshot() Derive
Clear()
}

// UseNilDerive is set to true to use NilDerive type for a standard Derive
var UseNilDerive bool

// GetOrRegisterDerive returns an existing Derive or constructs and registers a
// new StandardDerive.
func GetOrRegisterDerive(name string, r Registry) Derive {
if r == nil {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewDerive).(Derive)
}

// NewDerive constructs a new StandardDerive and launches a goroutine.
func NewDerive() Derive {
if UseNilDerive {
return NilDerive{}
}
m := newStandardDerive()
derives.Lock()
defer derives.Unlock()

derives.meters = append(derives.meters, m)
if !derives.started {
derives.started = true
go derives.tick()
}
return m
}

// NewRegisteredDerive constructs and registers a new StandardDerive
// and launches a goroutine.
func NewRegisteredDerive(name string, r Registry) Derive {
c := NewDerive()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}

// DeriveSnapshot is a read-only copy of another Derive.
type DeriveSnapshot struct {
count, base int64
rate1, rate5, rate15, rateMean float64
initialized bool
}

// Count returns the count of events at the time the snapshot was taken.
func (m *DeriveSnapshot) Count() int64 { return m.count }

// Mark panics.
func (*DeriveSnapshot) Mark(n int64) {
panic("Mark called on a DeriveSnapshot")
}

// Base returns the current base value for Mark()s
func (m *DeriveSnapshot) Base() int64 { return m.base }

// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (m *DeriveSnapshot) Rate1() float64 { return m.rate1 }

// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (m *DeriveSnapshot) Rate5() float64 { return m.rate5 }

// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (m *DeriveSnapshot) Rate15() float64 { return m.rate15 }

// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (m *DeriveSnapshot) RateMean() float64 { return m.rateMean }

// Snapshot returns the snapshot.
func (m *DeriveSnapshot) Snapshot() Derive { return m }

// Clear is a no-op.
func (m *DeriveSnapshot) Clear() {}

// NilDerive is a no-op Derive.
type NilDerive struct{}

// Base is a no-op.
func (NilDerive) Base() int64 { return 0 }

// Count is a no-op.
func (NilDerive) Count() int64 { return 0 }

// Mark is a no-op.
func (NilDerive) Mark(n int64) {}

// Rate1 is a no-op.
func (NilDerive) Rate1() float64 { return 0.0 }

// Rate5 is a no-op.
func (NilDerive) Rate5() float64 { return 0.0 }

// Rate15 is a no-op.
func (NilDerive) Rate15() float64 { return 0.0 }

// RateMean is a no-op.
func (NilDerive) RateMean() float64 { return 0.0 }

// Snapshot is a no-op.
func (NilDerive) Snapshot() Derive { return NilDerive{} }

// Clear is a no-op.
func (NilDerive) Clear() {}

// StandardDerive is the standard implementation of a Meter.
type StandardDerive struct {
lock sync.RWMutex
snapshot *DeriveSnapshot
a1, a5, a15 EWMA
startTime time.Time
}

func newStandardDerive() *StandardDerive {
return &StandardDerive{
snapshot: &DeriveSnapshot{},
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
startTime: time.Now(),
}
}

// Clear clears the Derive
func (m *StandardDerive) Clear() {
derives.Lock()
defer derives.Unlock()
m.clear()
}

func (m *StandardDerive) clear() {
m.snapshot = &DeriveSnapshot{}
m.a1 = NewEWMA1()
m.a5 = NewEWMA5()
m.a15 = NewEWMA15()
m.startTime = time.Now()
}

// Base returns the current base value used for Mark()
func (m *StandardDerive) Base() int64 {
m.lock.RLock()
base := m.snapshot.base
m.lock.RUnlock()
return base
}

// Count returns the number of events recorded.
func (m *StandardDerive) Count() int64 {
m.lock.RLock()
count := m.snapshot.count
m.lock.RUnlock()
return count
}

// Mark records the occurance of n events.
func (m *StandardDerive) Mark(n int64) {
m.lock.Lock()
defer m.lock.Unlock()
if !m.snapshot.initialized {
m.snapshot.base = n
m.snapshot.initialized = true
return
}

switch {
case m.snapshot.base == n:
// nothing happened
case m.snapshot.base < n: // default case ;-)
diff := n - m.snapshot.base
m.snapshot.count += diff
m.snapshot.base = n
m.a1.Update(diff)
m.a5.Update(diff)
m.a15.Update(diff)
m.updateSnapshot()
default: // base > n: counter reset
m.clear()
m.snapshot.base = n
m.snapshot.initialized = true
}
}

// Rate1 returns the one-minute moving average rate of events per second.
func (m *StandardDerive) Rate1() float64 {
m.lock.RLock()
rate1 := m.snapshot.rate1
m.lock.RUnlock()
return rate1
}

// Rate5 returns the five-minute moving average rate of events per second.
func (m *StandardDerive) Rate5() float64 {
m.lock.RLock()
rate5 := m.snapshot.rate5
m.lock.RUnlock()
return rate5
}

// Rate15 returns the fifteen-minute moving average rate of events per second.
func (m *StandardDerive) Rate15() float64 {
m.lock.RLock()
rate15 := m.snapshot.rate15
m.lock.RUnlock()
return rate15
}

// RateMean returns the meter's mean rate of events per second.
func (m *StandardDerive) RateMean() float64 {
m.lock.RLock()
rateMean := m.snapshot.rateMean
m.lock.RUnlock()
return rateMean
}

// Snapshot returns a read-only copy of the meter.
func (m *StandardDerive) Snapshot() Derive {
m.lock.RLock()
snapshot := *m.snapshot
m.lock.RUnlock()
return &snapshot
}

func (m *StandardDerive) updateSnapshot() {
// should run with write lock held on m.lock
snapshot := m.snapshot
snapshot.rate1 = m.a1.Rate()
snapshot.rate5 = m.a5.Rate()
snapshot.rate15 = m.a15.Rate()
snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
}

func (m *StandardDerive) tick() {
m.lock.Lock()
defer m.lock.Unlock()
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
m.updateSnapshot()
}

type deriveArbiter struct {
sync.RWMutex
started bool
meters []*StandardDerive
ticker *time.Ticker
}

var derives = deriveArbiter{ticker: time.NewTicker(5e9)}

// Ticks meters on the scheduled interval
func (ma *deriveArbiter) tick() {
for {
select {
case <-ma.ticker.C:
ma.tickMeters()
}
}
}

func (ma *deriveArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for _, meter := range ma.meters {
meter.tick()
}
}
7 changes: 7 additions & 0 deletions json.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func (r *StandardRegistry) MarshalJSON() ([]byte, error) {
values["5m.rate"] = m.Rate5()
values["15m.rate"] = m.Rate15()
values["mean.rate"] = m.RateMean()
case Derive:
m := metric.Snapshot()
values["count"] = m.Count()
values["1m.rate"] = m.Rate1()
values["5m.rate"] = m.Rate5()
values["15m.rate"] = m.Rate15()
values["mean.rate"] = m.RateMean()
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
Expand Down
9 changes: 9 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
l.Printf(" 5-min rate: %12.2f\n", m.Rate5())
l.Printf(" 15-min rate: %12.2f\n", m.Rate15())
l.Printf(" mean rate: %12.2f\n", m.RateMean())
case Derive:
m := metric.Snapshot()
l.Printf("meter %s\n", name)
l.Printf(" base: %9d\n", m.Base())
l.Printf(" count: %9d\n", m.Count())
l.Printf(" 1-min rate: %12.2f\n", m.Rate1())
l.Printf(" 5-min rate: %12.2f\n", m.Rate5())
l.Printf(" 15-min rate: %12.2f\n", m.Rate15())
l.Printf(" mean rate: %12.2f\n", m.RateMean())
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
Expand Down
2 changes: 1 addition & 1 deletion registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *StandardRegistry) register(name string, i interface{}) error {
return DuplicateMetric(name)
}
switch i.(type) {
case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer:
case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, Derive:
r.metrics[name] = i
}
return nil
Expand Down
12 changes: 12 additions & 0 deletions syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
m.Rate15(),
m.RateMean(),
))
case Derive:
m := metric.Snapshot()
w.Info(fmt.Sprintf(
"derive %s: count: %d base: %d 1-min: %.2f 5-min: %.2f 15-min: %.2f mean: %.2f",
name,
m.Count(),
m.Base(),
m.Rate1(),
m.Rate5(),
m.Rate15(),
m.RateMean(),
))
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
Expand Down
9 changes: 9 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func WriteOnce(r Registry, w io.Writer) {
fmt.Fprintf(w, " 5-min rate: %12.2f\n", m.Rate5())
fmt.Fprintf(w, " 15-min rate: %12.2f\n", m.Rate15())
fmt.Fprintf(w, " mean rate: %12.2f\n", m.RateMean())
case Derive:
m := metric.Snapshot()
fmt.Fprintf(w, "meter %s\n", namedMetric.name)
fmt.Fprintf(w, " base: %9d\n", m.Base())
fmt.Fprintf(w, " count: %9d\n", m.Count())
fmt.Fprintf(w, " 1-min rate: %12.2f\n", m.Rate1())
fmt.Fprintf(w, " 5-min rate: %12.2f\n", m.Rate5())
fmt.Fprintf(w, " 15-min rate: %12.2f\n", m.Rate15())
fmt.Fprintf(w, " mean rate: %12.2f\n", m.RateMean())
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
Expand Down