diff --git a/opentsdb.go b/opentsdb.go index 266b6c9..0aa7312 100644 --- a/opentsdb.go +++ b/opentsdb.go @@ -38,7 +38,7 @@ func OpenTSDB(r Registry, d time.Duration, prefix string, addr *net.TCPAddr) { // OpenTSDBWithConfig is a blocking exporter function just like OpenTSDB, // but it takes a OpenTSDBConfig instead. func OpenTSDBWithConfig(c OpenTSDBConfig) { - for _ = range time.Tick(c.FlushInterval) { + for range time.Tick(c.FlushInterval) { if err := openTSDB(&c); nil != err { log.Println(err) } @@ -67,53 +67,118 @@ func openTSDB(c *OpenTSDBConfig) error { } defer conn.Close() w := bufio.NewWriter(conn) - c.Registry.Each(func(name string, i interface{}) { - switch metric := i.(type) { - case Counter: - fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) - case Gauge: - fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) - case GaugeFloat64: - fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) - case Histogram: - h := metric.Snapshot() - ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) - fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, h.Count(), shortHostname) - fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, h.Min(), shortHostname) - fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, h.Max(), shortHostname) - fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, h.Mean(), shortHostname) - fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, h.StdDev(), shortHostname) - fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0], shortHostname) - fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1], shortHostname) - fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2], shortHostname) - fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3], shortHostname) - fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4], shortHostname) - case Meter: - m := metric.Snapshot() - fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, m.Count(), shortHostname) - fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate1(), shortHostname) - fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate5(), shortHostname) - fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate15(), shortHostname) - fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, m.RateMean(), shortHostname) - case Timer: - t := metric.Snapshot() - ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) - fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, t.Count(), shortHostname) - fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, t.Min()/int64(du), shortHostname) - fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, t.Max()/int64(du), shortHostname) - fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, t.Mean()/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, t.StdDev()/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0]/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1]/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2]/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3]/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4]/du, shortHostname) - fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate1(), shortHostname) - fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate5(), shortHostname) - fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate15(), shortHostname) - fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f host=%s\n", c.Prefix, name, now, t.RateMean(), shortHostname) - } - w.Flush() - }) + tagRegistry, ok := c.Registry.(*TagRegistry) + if ok { + tagRegistry.EachWithTag(func(name string, tags map[string]string, i interface{}) { + if tags == nil { + tags = map[string]string{ + "host": shortHostname, + } + } else if tags["host"] == "" { + tags["host"] = shortHostname + } + tagString := "" + for tagName, tagValue := range tags { + tagString = fmt.Sprintf("%s %s=%s", tagString, tagName, tagValue) + } + tagString = strings.TrimSpace(tagString) + + switch metric := i.(type) { + case Counter: + fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, metric.Count(), tagString) + case Gauge: + fmt.Fprintf(w, "put %s.%s.value %d %d %s\n", c.Prefix, name, now, metric.Value(), tagString) + case GaugeFloat64: + fmt.Fprintf(w, "put %s.%s.value %d %f %s\n", c.Prefix, name, now, metric.Value(), tagString) + case Histogram: + h := metric.Snapshot() + ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) + fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, h.Count(), tagString) + fmt.Fprintf(w, "put %s.%s.min %d %d %s\n", c.Prefix, name, now, h.Min(), tagString) + fmt.Fprintf(w, "put %s.%s.max %d %d %s\n", c.Prefix, name, now, h.Max(), tagString) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, h.Mean(), tagString) + fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f %s\n", c.Prefix, name, now, h.StdDev(), tagString) + fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f %s\n", c.Prefix, name, now, ps[0], tagString) + fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f %s\n", c.Prefix, name, now, ps[1], tagString) + fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f %s\n", c.Prefix, name, now, ps[2], tagString) + fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f %s\n", c.Prefix, name, now, ps[3], tagString) + fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f %s\n", c.Prefix, name, now, ps[4], tagString) + case Meter: + m := metric.Snapshot() + fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, m.Count(), tagString) + fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate1(), tagString) + fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate5(), tagString) + fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f %s\n", c.Prefix, name, now, m.Rate15(), tagString) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, m.RateMean(), tagString) + case Timer: + t := metric.Snapshot() + ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) + fmt.Fprintf(w, "put %s.%s.count %d %d %s\n", c.Prefix, name, now, t.Count(), tagString) + fmt.Fprintf(w, "put %s.%s.min %d %d %s\n", c.Prefix, name, now, t.Min()/int64(du), tagString) + fmt.Fprintf(w, "put %s.%s.max %d %d %s\n", c.Prefix, name, now, t.Max()/int64(du), tagString) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f %s\n", c.Prefix, name, now, t.Mean()/du, tagString) + fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f %s\n", c.Prefix, name, now, t.StdDev()/du, tagString) + fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f %s\n", c.Prefix, name, now, ps[0]/du, tagString) + fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f %s\n", c.Prefix, name, now, ps[1]/du, tagString) + fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f %s\n", c.Prefix, name, now, ps[2]/du, tagString) + fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f %s\n", c.Prefix, name, now, ps[3]/du, tagString) + fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f %s\n", c.Prefix, name, now, ps[4]/du, tagString) + fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate1(), tagString) + fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate5(), tagString) + fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f %s\n", c.Prefix, name, now, t.Rate15(), tagString) + fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f %s\n", c.Prefix, name, now, t.RateMean(), tagString) + } + w.Flush() + }) + } else { + c.Registry.Each(func(name string, i interface{}) { + switch metric := i.(type) { + case Counter: + fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) + case Gauge: + fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) + case GaugeFloat64: + fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) + case Histogram: + h := metric.Snapshot() + ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) + fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, h.Count(), shortHostname) + fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, h.Min(), shortHostname) + fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, h.Max(), shortHostname) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, h.Mean(), shortHostname) + fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, h.StdDev(), shortHostname) + fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0], shortHostname) + fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1], shortHostname) + fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2], shortHostname) + fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3], shortHostname) + fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4], shortHostname) + case Meter: + m := metric.Snapshot() + fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, m.Count(), shortHostname) + fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate1(), shortHostname) + fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate5(), shortHostname) + fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, m.Rate15(), shortHostname) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, m.RateMean(), shortHostname) + case Timer: + t := metric.Snapshot() + ps := t.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) + fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, t.Count(), shortHostname) + fmt.Fprintf(w, "put %s.%s.min %d %d host=%s\n", c.Prefix, name, now, t.Min()/int64(du), shortHostname) + fmt.Fprintf(w, "put %s.%s.max %d %d host=%s\n", c.Prefix, name, now, t.Max()/int64(du), shortHostname) + fmt.Fprintf(w, "put %s.%s.mean %d %.2f host=%s\n", c.Prefix, name, now, t.Mean()/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.std-dev %d %.2f host=%s\n", c.Prefix, name, now, t.StdDev()/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.50-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[0]/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.75-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[1]/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.95-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[2]/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.99-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[3]/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.999-percentile %d %.2f host=%s\n", c.Prefix, name, now, ps[4]/du, shortHostname) + fmt.Fprintf(w, "put %s.%s.one-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate1(), shortHostname) + fmt.Fprintf(w, "put %s.%s.five-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate5(), shortHostname) + fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate15(), shortHostname) + fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f host=%s\n", c.Prefix, name, now, t.RateMean(), shortHostname) + } + w.Flush() + }) + } return nil } diff --git a/registry.go b/registry.go index 9086dcb..20d0f48 100644 --- a/registry.go +++ b/registry.go @@ -1,6 +1,7 @@ package metrics import ( + "encoding/json" "fmt" "reflect" "strings" @@ -120,7 +121,7 @@ func (r *StandardRegistry) Unregister(name string) { func (r *StandardRegistry) UnregisterAll() { r.mutex.Lock() defer r.mutex.Unlock() - for name, _ := range r.metrics { + for name := range r.metrics { delete(r.metrics, name) } } @@ -167,9 +168,9 @@ func NewPrefixedChildRegistry(parent Registry, prefix string) Registry { // Call the given function for each registered metric. func (r *PrefixedRegistry) Each(fn func(string, interface{})) { - wrappedFn := func (prefix string) func(string, interface{}) { + wrappedFn := func(prefix string) func(string, interface{}) { return func(name string, iface interface{}) { - if strings.HasPrefix(name,prefix) { + if strings.HasPrefix(name, prefix) { fn(name, iface) } else { return @@ -184,7 +185,7 @@ func (r *PrefixedRegistry) Each(fn func(string, interface{})) { func findPrefix(registry Registry, prefix string) (Registry, string) { switch r := registry.(type) { case *PrefixedRegistry: - return findPrefix(r.underlying, r.prefix + prefix) + return findPrefix(r.underlying, r.prefix+prefix) case *StandardRegistry: return r, prefix } @@ -268,3 +269,138 @@ func RunHealthchecks() { func Unregister(name string) { DefaultRegistry.Unregister(name) } + +// Stores metric along with tags +type TagMetric struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` +} + +// The standard implementation of a Registry with tag support. It is a +// mutex-protected map of names to metrics. +// Note: It behaves exactlt same as StandardRegistry with an additional +// support for tags +type TagRegistry struct { + metrics map[string]interface{} + mutex sync.Mutex +} + +// Create a new registry. +func NewTagRegistry() *TagRegistry { + return &TagRegistry{metrics: make(map[string]interface{})} +} + +// Get metric key name +// If name is already a marshalled data, return name +func (r *TagRegistry) metricKeyName(name string) string { + tagMetric := TagMetric{} + err := json.Unmarshal([]byte(name), &tagMetric) + if err == nil { + return name + } else { + byteJson, _ := json.Marshal(TagMetric{Name: name}) + return string(byteJson) + } +} + +// Call the given function for each registered metric. +func (r *TagRegistry) Each(f func(string, interface{})) { + for tagMetricJson, i := range r.registered() { + tagMetric := TagMetric{} + json.Unmarshal([]byte(tagMetricJson), &tagMetric) + f(tagMetric.Name, i) + } +} + +// Call the given function for each registered metric along with tags. +func (r *TagRegistry) EachWithTag(f func(string, map[string]string, interface{})) { + for tagMetricJson, i := range r.registered() { + tagMetric := TagMetric{} + json.Unmarshal([]byte(tagMetricJson), &tagMetric) + f(tagMetric.Name, tagMetric.Tags, i) + } +} + +// Get the metric by the given name or nil if none is registered. +func (r *TagRegistry) Get(name string) interface{} { + r.mutex.Lock() + defer r.mutex.Unlock() + metricKey := r.metricKeyName(name) + return r.metrics[metricKey] +} + +// Gets an existing metric or creates and registers a new one. Threadsafe +// alternative to calling Get and Register on failure. +// The interface can be the metric to register if not found in registry, +// or a function returning the metric for lazy instantiation. +func (r *TagRegistry) GetOrRegister(name string, i interface{}) interface{} { + r.mutex.Lock() + defer r.mutex.Unlock() + metricKey := r.metricKeyName(name) + if metric, ok := r.metrics[metricKey]; ok { + return metric + } + if v := reflect.ValueOf(i); v.Kind() == reflect.Func { + i = v.Call(nil)[0].Interface() + } + r.register(metricKey, i) + return i +} + +// Register the given metric under the given name. Returns a DuplicateMetric +// if a metric by the given name is already registered. +func (r *TagRegistry) Register(name string, i interface{}) error { + r.mutex.Lock() + defer r.mutex.Unlock() + metricKey := r.metricKeyName(name) + return r.register(metricKey, i) +} + +// Run all registered healthchecks. +func (r *TagRegistry) RunHealthchecks() { + r.mutex.Lock() + defer r.mutex.Unlock() + for _, i := range r.metrics { + if h, ok := i.(Healthcheck); ok { + h.Check() + } + } +} + +// Unregister the metric with the given name. +func (r *TagRegistry) Unregister(name string) { + r.mutex.Lock() + defer r.mutex.Unlock() + metricKey := r.metricKeyName(name) + delete(r.metrics, metricKey) +} + +// Unregister all metrics. (Mostly for testing.) +func (r *TagRegistry) UnregisterAll() { + r.mutex.Lock() + defer r.mutex.Unlock() + for name := range r.metrics { + delete(r.metrics, name) + } +} + +func (r *TagRegistry) register(name string, i interface{}) error { + if _, ok := r.metrics[name]; ok { + return DuplicateMetric(name) + } + switch i.(type) { + case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer: + r.metrics[name] = i + } + return nil +} + +func (r *TagRegistry) registered() map[string]interface{} { + r.mutex.Lock() + defer r.mutex.Unlock() + metrics := make(map[string]interface{}, len(r.metrics)) + for name, i := range r.metrics { + metrics[name] = i + } + return metrics +} diff --git a/registry_test.go b/registry_test.go index afffe89..b42d4ee 100644 --- a/registry_test.go +++ b/registry_test.go @@ -281,9 +281,8 @@ func TestWalkRegistries(t *testing.T) { Register("bars", c) _, prefix := findPrefix(r2, "") - if "prefix.prefix2." != prefix { + if "prefix.prefix2." != prefix { t.Fatal(prefix) } - }