Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TagRegistry support for OpenTSDB #189

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 114 additions & 49 deletions opentsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func OpenTSDB(r Registry, d time.Duration, prefix string, addr *net.TCPAddr) {
// OpenTSDBWithConfig is a blocking exporter function just like OpenTSDB,
// but it takes a OpenTSDBConfig instead.
func OpenTSDBWithConfig(c OpenTSDBConfig) {
for _ = range time.Tick(c.FlushInterval) {
for range time.Tick(c.FlushInterval) {
if err := openTSDB(&c); nil != err {
log.Println(err)
}
Expand Down Expand Up @@ -67,53 +67,118 @@ func openTSDB(c *OpenTSDBConfig) error {
}
defer conn.Close()
w := bufio.NewWriter(conn)
c.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname)
case Gauge:
fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case GaugeFloat64:
fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, h.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, h.Min(), shortHostname)
fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, h.Max(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, h.Mean(), shortHostname)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, h.StdDev(), shortHostname)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0], shortHostname)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1], shortHostname)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2], shortHostname)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3], shortHostname)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4], shortHostname)
case Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, m.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate1(), shortHostname)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate5(), shortHostname)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, m.RateMean(), shortHostname)
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, t.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, t.Min()/int64(du), shortHostname)
fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, t.Max()/int64(du), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, t.Mean()/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, t.StdDev()/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate1(), shortHostname)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate5(), shortHostname)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f host=%s\n", c.Prefix, name, now, t.RateMean(), shortHostname)
}
w.Flush()
})
tagRegistry, ok := c.Registry.(*TagRegistry)
if ok {
tagRegistry.EachWithTag(func(name string, tags map[string]string, i interface{}) {
if tags == nil {
tags = map[string]string{
"host": shortHostname,
}
} else if tags["host"] == "" {
tags["host"] = shortHostname
}
tagString := ""
for tagName, tagValue := range tags {
tagString = fmt.Sprintf("%s %s=%s", tagString, tagName, tagValue)
}
tagString = strings.TrimSpace(tagString)

switch metric := i.(type) {
case Counter:
fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, metric.Count(), tagString)
case Gauge:
fmt.Fprintf(w, "put %s.%s.value %d %d %s\n", c.Prefix, name, now, metric.Value(), tagString)
case GaugeFloat64:
fmt.Fprintf(w, "put %s.%s.value %d %f %s\n", c.Prefix, name, now, metric.Value(), tagString)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, h.Count(), tagString)
fmt.Fprintf(w, "put %s.%s.min %d %d %s\n", c.Prefix, name, now, h.Min(), tagString)
fmt.Fprintf(w, "put %s.%s.max %d %d %s\n", c.Prefix, name, now, h.Max(), tagString)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, h.Mean(), tagString)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f %s\n", c.Prefix, name, now, h.StdDev(), tagString)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f %s\n", c.Prefix, name, now, ps[0], tagString)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f %s\n", c.Prefix, name, now, ps[1], tagString)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f %s\n", c.Prefix, name, now, ps[2], tagString)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f %s\n", c.Prefix, name, now, ps[3], tagString)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f %s\n", c.Prefix, name, now, ps[4], tagString)
case Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, m.Count(), tagString)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate1(), tagString)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate5(), tagString)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate15(), tagString)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, m.RateMean(), tagString)
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, t.Count(), tagString)
fmt.Fprintf(w, "put %s.%s.min %d %d %s\n", c.Prefix, name, now, t.Min()/int64(du), tagString)
fmt.Fprintf(w, "put %s.%s.max %d %d %s\n", c.Prefix, name, now, t.Max()/int64(du), tagString)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, t.Mean()/du, tagString)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f %s\n", c.Prefix, name, now, t.StdDev()/du, tagString)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f %s\n", c.Prefix, name, now, ps[0]/du, tagString)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f %s\n", c.Prefix, name, now, ps[1]/du, tagString)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f %s\n", c.Prefix, name, now, ps[2]/du, tagString)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f %s\n", c.Prefix, name, now, ps[3]/du, tagString)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f %s\n", c.Prefix, name, now, ps[4]/du, tagString)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate1(), tagString)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate5(), tagString)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate15(), tagString)
fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f %s\n", c.Prefix, name, now, t.RateMean(), tagString)
}
w.Flush()
})
} else {
c.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname)
case Gauge:
fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case GaugeFloat64:
fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, h.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, h.Min(), shortHostname)
fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, h.Max(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, h.Mean(), shortHostname)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, h.StdDev(), shortHostname)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0], shortHostname)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1], shortHostname)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2], shortHostname)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3], shortHostname)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4], shortHostname)
case Meter:
m := metric.Snapshot()
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, m.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate1(), shortHostname)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate5(), shortHostname)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, m.RateMean(), shortHostname)
case Timer:
t := metric.Snapshot()
ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, t.Count(), shortHostname)
fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, t.Min()/int64(du), shortHostname)
fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, t.Max()/int64(du), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, t.Mean()/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, t.StdDev()/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4]/du, shortHostname)
fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate1(), shortHostname)
fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate5(), shortHostname)
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f host=%s\n", c.Prefix, name, now, t.RateMean(), shortHostname)
}
w.Flush()
})
}
return nil
}
144 changes: 140 additions & 4 deletions registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (r *StandardRegistry) Unregister(name string) {
func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name, _ := range r.metrics {
for name := range r.metrics {
delete(r.metrics, name)
}
}
Expand Down Expand Up @@ -167,9 +168,9 @@ func NewPrefixedChildRegistry(parent Registry, prefix string) Registry {

// Call the given function for each registered metric.
func (r *PrefixedRegistry) Each(fn func(string, interface{})) {
wrappedFn := func (prefix string) func(string, interface{}) {
wrappedFn := func(prefix string) func(string, interface{}) {
return func(name string, iface interface{}) {
if strings.HasPrefix(name,prefix) {
if strings.HasPrefix(name, prefix) {
fn(name, iface)
} else {
return
Expand All @@ -184,7 +185,7 @@ func (r *PrefixedRegistry) Each(fn func(string, interface{})) {
func findPrefix(registry Registry, prefix string) (Registry, string) {
switch r := registry.(type) {
case *PrefixedRegistry:
return findPrefix(r.underlying, r.prefix + prefix)
return findPrefix(r.underlying, r.prefix+prefix)
case *StandardRegistry:
return r, prefix
}
Expand Down Expand Up @@ -268,3 +269,138 @@ func RunHealthchecks() {
func Unregister(name string) {
DefaultRegistry.Unregister(name)
}

// Stores metric along with tags
type TagMetric struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
}

// The standard implementation of a Registry with tag support. It is a
// mutex-protected map of names to metrics.
// Note: It behaves exactlt same as StandardRegistry with an additional
// support for tags
type TagRegistry struct {
metrics map[string]interface{}
mutex sync.Mutex
}

// Create a new registry.
func NewTagRegistry() *TagRegistry {
return &TagRegistry{metrics: make(map[string]interface{})}
}

// Get metric key name
// If name is already a marshalled data, return name
func (r *TagRegistry) metricKeyName(name string) string {
tagMetric := TagMetric{}
err := json.Unmarshal([]byte(name), &tagMetric)
if err == nil {
return name
} else {
byteJson, _ := json.Marshal(TagMetric{Name: name})
return string(byteJson)
}
}

// Call the given function for each registered metric.
func (r *TagRegistry) Each(f func(string, interface{})) {
for tagMetricJson, i := range r.registered() {
tagMetric := TagMetric{}
json.Unmarshal([]byte(tagMetricJson), &tagMetric)
f(tagMetric.Name, i)
}
}

// Call the given function for each registered metric along with tags.
func (r *TagRegistry) EachWithTag(f func(string, map[string]string, interface{})) {
for tagMetricJson, i := range r.registered() {
tagMetric := TagMetric{}
json.Unmarshal([]byte(tagMetricJson), &tagMetric)
f(tagMetric.Name, tagMetric.Tags, i)
}
}

// Get the metric by the given name or nil if none is registered.
func (r *TagRegistry) Get(name string) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
metricKey := r.metricKeyName(name)
return r.metrics[metricKey]
}

// Gets an existing metric or creates and registers a new one. Threadsafe
// alternative to calling Get and Register on failure.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
func (r *TagRegistry) GetOrRegister(name string, i interface{}) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
metricKey := r.metricKeyName(name)
if metric, ok := r.metrics[metricKey]; ok {
return metric
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
r.register(metricKey, i)
return i
}

// Register the given metric under the given name. Returns a DuplicateMetric
// if a metric by the given name is already registered.
func (r *TagRegistry) Register(name string, i interface{}) error {
r.mutex.Lock()
defer r.mutex.Unlock()
metricKey := r.metricKeyName(name)
return r.register(metricKey, i)
}

// Run all registered healthchecks.
func (r *TagRegistry) RunHealthchecks() {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, i := range r.metrics {
if h, ok := i.(Healthcheck); ok {
h.Check()
}
}
}

// Unregister the metric with the given name.
func (r *TagRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
metricKey := r.metricKeyName(name)
delete(r.metrics, metricKey)
}

// Unregister all metrics. (Mostly for testing.)
func (r *TagRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name := range r.metrics {
delete(r.metrics, name)
}
}

func (r *TagRegistry) register(name string, i interface{}) error {
if _, ok := r.metrics[name]; ok {
return DuplicateMetric(name)
}
switch i.(type) {
case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer:
r.metrics[name] = i
}
return nil
}

func (r *TagRegistry) registered() map[string]interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
metrics := make(map[string]interface{}, len(r.metrics))
for name, i := range r.metrics {
metrics[name] = i
}
return metrics
}
3 changes: 1 addition & 2 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ func TestWalkRegistries(t *testing.T) {
Register("bars", c)

_, prefix := findPrefix(r2, "")
if "prefix.prefix2." != prefix {
if "prefix.prefix2." != prefix {
t.Fatal(prefix)
}


}