Skip to content

Commit

Permalink
Improved logging on close timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed May 5, 2020
1 parent eded2a5 commit 3e0f876
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 25 deletions.
1 change: 0 additions & 1 deletion kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Interfaces.
type KafkaConsumer interface {
Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
ReadMessage(timeout time.Duration) (*kafka.Message, error)
Events() chan kafka.Event
Close() error
}

Expand Down
17 changes: 0 additions & 17 deletions kafka_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@ import (
type consMockFuncs struct {
Subscribe func(m *consMock, topic string, rebalanceCb kafka.RebalanceCb) error
ReadMessage func(m *consMock, timeout time.Duration) (*kafka.Message, error)
Events func(m *consMock) chan kafka.Event
Close func(m *consMock) error
}

type consMockCounts struct {
Subscribe,
ReadMessage,
Events,
Close concurrent.AtomicCounter
}

type consMock struct {
events chan kafka.Event
rebalanceCallback kafka.RebalanceCb
rebalanceEvents chan kafka.Event
messages chan *kafka.Message
Expand Down Expand Up @@ -50,20 +47,12 @@ func (m *consMock) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
return m.f.ReadMessage(m, timeout)
}

func (m *consMock) Events() chan kafka.Event {
defer m.c.Events.Inc()
return m.f.Events(m)
}

func (m *consMock) Close() error {
defer m.c.Close.Inc()
return m.f.Close(m)
}

func (m *consMock) fillDefaults() {
if m.events == nil {
m.events = make(chan kafka.Event)
}
if m.rebalanceEvents == nil {
m.rebalanceEvents = make(chan kafka.Event)
}
Expand All @@ -85,17 +74,11 @@ func (m *consMock) fillDefaults() {
}
}
}
if m.f.Events == nil {
m.f.Events = func(m *consMock) chan kafka.Event {
return m.events
}
}
if m.f.Close == nil {
m.f.Close = func(m *consMock) error {
return nil
}
}
m.c.Events = concurrent.NewAtomicCounter()
m.c.Subscribe = concurrent.NewAtomicCounter()
m.c.ReadMessage = concurrent.NewAtomicCounter()
m.c.Close = concurrent.NewAtomicCounter()
Expand Down
4 changes: 2 additions & 2 deletions neli.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ func (n *neli) Close() error {
// the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a
// frozen harvester. (The rest of the battery will still unwind normally.)
const closeTimeout = 10 * time.Second
_, _ = performTimed(void(n.producer.Close), closeTimeout)
_, _ = performTimed(n.logger().W(), "producer close", void(n.producer.Close), closeTimeout)

// Similarly to the above, Consumer.Close() may also hang, and we need to cope with this until #463 is resolved.
_, err := performTimed(n.consumer.Close, closeTimeout)
_, err := performTimed(n.logger().W(), "consumer close", n.consumer.Close, closeTimeout)
return err
}

Expand Down
6 changes: 4 additions & 2 deletions timed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/obsidiandynamics/libstdgo/concurrent"
"github.com/obsidiandynamics/libstdgo/scribe"
)

var errPerformWithNoError = errors.New("")
Expand All @@ -16,10 +17,10 @@ func void(f func()) func() error {
}
}

func performTimed(f func() error, timeout time.Duration) (bool, error) {
func performTimed(logger scribe.Logger, opName string, op func() error, timeout time.Duration) (bool, error) {
errorRef := concurrent.NewAtomicReference()
go func() {
err := f()
err := op()
if err != nil {
errorRef.Set(err)
} else {
Expand All @@ -29,6 +30,7 @@ func performTimed(f func() error, timeout time.Duration) (bool, error) {

res := errorRef.Await(concurrent.RefNot(concurrent.RefNil()), timeout)
if res == nil {
logger("Operation '%s' failed to complete within %v", opName, timeout)
return false, nil
}
if err := res.(error); err != errPerformWithNoError {
Expand Down
19 changes: 16 additions & 3 deletions timed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,46 @@ import (
"time"

"github.com/obsidiandynamics/libstdgo/check"
"github.com/obsidiandynamics/libstdgo/scribe"
"github.com/stretchr/testify/require"
)

func TestPerformTimed_noError(t *testing.T) {
done, err := performTimed(void(func() {}), 10*time.Second)
m := scribe.NewMock()
scr := scribe.New(m.Factories())
done, err := performTimed(scr.W(), "some-op", void(func() {}), 10*time.Second)
require.True(t, done)
require.Nil(t, err)
m.Entries().Assert(t, scribe.Count(0))
}

func TestPerformTimed_withError(t *testing.T) {
done, err := performTimed(func() error {
m := scribe.NewMock()
scr := scribe.New(m.Factories())
done, err := performTimed(scr.W(), "some-op", func() error {
return check.ErrSimulated
}, 10*time.Second)
require.True(t, done)
require.Equal(t, check.ErrSimulated, err)
m.Entries().Assert(t, scribe.Count(0))
}

func TestPerformTimed_withTimeout(t *testing.T) {
m := scribe.NewMock()
scr := scribe.New(m.Factories())
wg := sync.WaitGroup{}
wg.Add(1)

done, err := performTimed(func() error {
done, err := performTimed(scr.W(), "some-op", func() error {
wg.Wait()
return nil
}, 1*time.Millisecond)
require.False(t, done)
require.Nil(t, err)
m.Entries().
Having(scribe.LogLevel(scribe.Warn)).
Having(scribe.MessageEqual("Operation 'some-op' failed to complete within 1ms")).
Assert(t, scribe.Count(1))

wg.Done()
}

0 comments on commit 3e0f876

Please sign in to comment.