Skip to content

Commit

Permalink
update telemetry server to use EventBus for graceful reload (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored May 17, 2017
1 parent 1319f70 commit d6da3da
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 42 deletions.
2 changes: 1 addition & 1 deletion core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (a *App) handlePolling() {
for _, sensor := range a.Telemetry.Sensors {
sensor.Run(a.Bus)
}
a.Telemetry.Serve()
a.Telemetry.Run(a.Bus)
}
// kick everything off
a.Bus.Publish(events.GlobalStartup)
Expand Down
98 changes: 66 additions & 32 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package telemetry

import (
"context"
"net"
"net/http"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/joyent/containerpilot/events"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -16,10 +17,11 @@ type Telemetry struct {
Sensors []*Sensor
Path string
heartbeat time.Duration
mux *http.ServeMux
lock sync.RWMutex
router *http.ServeMux
addr net.TCPAddr
listening bool

http.Server
events.EventHandler // Event handling
}

// NewTelemetry configures a new prometheus Telemetry server
Expand All @@ -29,48 +31,80 @@ func NewTelemetry(cfg *Config) *Telemetry {
}
t := &Telemetry{
Path: "/metrics", // TODO hard-coded?
lock: sync.RWMutex{},
Sensors: []*Sensor{},
}
t.addr = cfg.addr
t.mux = http.NewServeMux()
t.mux.Handle(t.Path, prometheus.Handler())
router := http.NewServeMux()
router.Handle(t.Path, prometheus.Handler())
t.Handler = router

for _, sensorCfg := range cfg.SensorConfigs {
sensor := NewSensor(sensorCfg)
t.Sensors = append(t.Sensors, sensor)
}
t.Rx = make(chan events.Event, 10)
return t
}

var listener net.Listener
// Run executes the event loop for the telemetry server
func (t *Telemetry) Run(bus *events.EventBus) {
t.Subscribe(bus, true)
t.Bus = bus
t.Start()

// Serve starts serving the telemetry service
func (t *Telemetry) Serve() {
t.lock.Lock()
defer t.lock.Unlock()
go func() {
defer t.Stop()
for {
event := <-t.Rx
switch event {
case
events.QuitByClose,
events.GlobalShutdown:
return
}
}
}()
}

// No-op if we've created the server previously.
// TODO: golang's native implementation of http.Server.Server() cannot
// support graceful reload. We need to select an alternate implementation
// but in the meantime we need to back-out the change to reloading
// ref https://github.com/joyent/containerpilot/pull/165
if listener != nil {
return
}
ln, err := net.Listen(t.addr.Network(), t.addr.String())
if err != nil {
log.Fatalf("Error serving telemetry on %s: %v", t.addr.String(), err)
}
listener = ln
t.listening = true
// Start starts serving the telemetry service
func (t *Telemetry) Start() {
ln := t.listenWithRetry()
go func() {
log.Infof("telemetry: Listening on %s", t.addr.String())
log.Fatal(http.Serve(listener, t.mux))
log.Debugf("telemetry: Stopped listening on %s", t.addr.String())
log.Infof("telemetry: serving at %s", t.Addr)
t.Serve(ln)
log.Debugf("telemetry: stopped serving at %s", t.Addr)
}()
}

// Shutdown shuts down the telemetry service
func (t *Telemetry) Shutdown() {
log.Debug("telemetry: shutdown received but currently a no-op")
// on a reload we can't guarantee that the control server will be shut down
// and the socket file cleaned up before we're ready to start again, so we'll
// retry with the listener a few times before bailing out.
func (t *Telemetry) listenWithRetry() net.Listener {
var (
err error
ln net.Listener
)
for i := 0; i < 10; i++ {
ln, err = net.Listen(t.addr.Network(), t.addr.String())
if err == nil {
return ln
}
time.Sleep(time.Second)
}
log.Fatalf("error listening to socket at %s: %v", t.Addr, err)
return nil
}

// Stop shuts down the telemetry service
func (t *Telemetry) Stop() {
log.Debug("telemetry: stopping server")
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
if err := t.Shutdown(ctx); err != nil {
log.Warnf("telemetry: failed to gracefully shutdown server: %v", err)
return
}
t.Unsubscribe(t.Bus, true)
close(t.Rx)
log.Debug("telemetry: completed graceful shutdown of server")
}
15 changes: 6 additions & 9 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"testing"

"github.com/joyent/containerpilot/events"
"github.com/joyent/containerpilot/tests/mocks"
)

Expand All @@ -14,24 +15,21 @@ func TestTelemetryServerRestart(t *testing.T) {
cfg.Validate(&mocks.NoopDiscoveryBackend{})

telem := NewTelemetry(cfg)

// initial server
telem.Serve()
bus := events.NewEventBus()
telem.Run(bus)
checkServerIsListening(t, telem)
telem.Shutdown()
telem.Stop()

// reloaded server
telem = NewTelemetry(cfg)
telem.Serve()
telem.Run(bus)
checkServerIsListening(t, telem)
}

func checkServerIsListening(t *testing.T, telem *Telemetry) {
telem.lock.RLock()
defer telem.lock.RUnlock()
verifyMetricsEndpointOk(t, telem)
}

func verifyMetricsEndpointOk(t *testing.T, telem *Telemetry) {
url := fmt.Sprintf("http://%v:%v/metrics", telem.addr.IP, telem.addr.Port)
resp, err := http.Get(url)
if err != nil {
Expand All @@ -41,5 +39,4 @@ func verifyMetricsEndpointOk(t *testing.T, telem *Telemetry) {
if resp.StatusCode != 200 {
t.Fatalf("got %v status from telemetry server", resp.StatusCode)
}

}

0 comments on commit d6da3da

Please sign in to comment.