Skip to content

Commit

Permalink
Terminate gracefully on SIGTERM or SIGINT
Browse files Browse the repository at this point in the history
We need to make sure we write the latest resume token when we exit so
that we can resume next time we run. To make this more likely we will
respond gracefully to SIGTERM and SIGINT so that we'll deplete the
buffers, flush everything we got to the Cloudwatch Logs API and then
write the final state to the state file before we exit.

Previously we might not have done this correctly if we happened to get
a signal during a write to cloudwatch, since cloudwatch would return to us
a new token but we'd never get a chance to write it to the state file
before we exit.
  • Loading branch information
apparentlymart committed Dec 28, 2015
1 parent 5821423 commit eda7fc7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 3 deletions.
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,12 @@ func run(configFilename string) error {

}

// We fall out here when interrupted by a signal.
// Last chance to write the state.
err = state.SetState(bootId, nextSeq)
if err != nil {
return fmt.Errorf("Failed to write state on exit: %s", err)
}

return nil
}
32 changes: 29 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,21 @@ import (
func ReadRecords(instanceId string, journal *sdjournal.Journal, c chan<- *Record, skip uint64) {
record := &Record{}

termC := MakeTerminateChannel()
checkTerminate := func() bool {
select {
case <-termC:
close(c)
return true
default:
return false
}
}

for {
if checkTerminate() {
return
}
err := UnmarshalRecord(journal, record)
if err != nil {
c <- synthRecord(
Expand All @@ -27,6 +41,9 @@ func ReadRecords(instanceId string, journal *sdjournal.Journal, c chan<- *Record
}

for {
if checkTerminate() {
return
}
seeked, err := journal.Next()
if err != nil {
c <- synthRecord(
Expand All @@ -35,13 +52,17 @@ func ReadRecords(instanceId string, journal *sdjournal.Journal, c chan<- *Record
// It's likely that we didn't actually advance here, so
// we should wait a bit so we don't spin the CPU at 100%
// when we run into errors.
time.Sleep(10 * time.Second)
time.Sleep(2 * time.Second)
continue
}
if seeked == 0 {
// If there's nothing new in the stream then we'll
// wait for something new to show up.
journal.Wait(10 * time.Second)
// FIXME: We can actually end up waiting up to 2 seconds
// to gracefully terminate because of this. It'd be nicer
// to stop waiting if we get a termination signal, but
// this will do for now.
journal.Wait(2 * time.Second)
continue
}
break
Expand All @@ -63,14 +84,19 @@ func BatchRecords(records <-chan *Record, batches chan<- []Record, batchSize int
bufs[0] = make([]Record, batchSize)
bufs[1] = make([]Record, batchSize)
var record *Record
var more bool
currentBuf := 0
next := 0
timer := time.NewTimer(time.Second)
timer.Stop()

for {
select {
case record = <-records:
case record, more = <-records:
if !more {
close(batches)
return
}
bufs[currentBuf][next] = *record
next++
if next < batchSize {
Expand Down
19 changes: 19 additions & 0 deletions terminate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"os"
"os/signal"
"syscall"
)

// MakeTerminateChannel returns a channel that will become readable if
// the process is interrupted or terminated via a signal.
//
// This is used to gracefully exit the reader loop, which in turn causes
// the rest of the program to gracefully terminate, flushing any remaining
// buffers and writing its persistent state to disk.
func MakeTerminateChannel() <-chan os.Signal {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ch
}

0 comments on commit eda7fc7

Please sign in to comment.