Skip to content

Commit

Permalink
Merge pull request hyperledger#102 from hyperledger/log-enhance
Browse files Browse the repository at this point in the history
Enhance logging on event dispatch, and resolve edge case panic
  • Loading branch information
nguyer authored Sep 28, 2023
2 parents dcc8484 + 792cccf commit c982c2b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
5 changes: 4 additions & 1 deletion internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (es *eventStream) processNewEvent(ctx context.Context, fev *ffcapi.Listener
if notification.Confirmed {
// Push it to the batch when confirmed
// - Note this will block the confirmation manager when the event stream is blocked
log.L(ctx).Debugf("Queuing confirmed event for batch assembly: '%s'", event)
es.batchChannel <- fev
}
},
Expand Down Expand Up @@ -741,6 +742,8 @@ func (es *eventStream) batchLoop(startedState *startedStreamState) {
},
Event: *fev.Event,
})
} else {
log.L(es.bgCtx).Warnf("Confirmed event not associated with any active listener: %s", fev.Event)
}
}
case <-timeoutChannel:
Expand All @@ -757,7 +760,7 @@ func (es *eventStream) batchLoop(startedState *startedStreamState) {
return
}

if timedOut || len(batch.events) >= maxSize {
if timedOut || (batch != nil && len(batch.events) >= maxSize) {
var err error
if batch != nil {
batch.timeout.Stop()
Expand Down
6 changes: 5 additions & 1 deletion internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ func TestEventLoopIgnoreBadEvent(t *testing.T) {
es.processNewEvent(context.Background(), &ffcapi.ListenerEvent{})
}

func TestSkipEventsBehindCheckpoint(t *testing.T) {
func TestSkipEventsBehindCheckpointAndUnknownListener(t *testing.T) {

es := newTestEventStream(t, `{
"name": "ut_stream"
Expand Down Expand Up @@ -1680,6 +1680,10 @@ func TestSkipEventsBehindCheckpoint(t *testing.T) {
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2000}, // on checkpoint - redelivery
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: listenerID, BlockNumber: 2000}},
}
es.batchChannel <- &ffcapi.ListenerEvent{
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2001}, // this is for a listener that no longer exists on the ES
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: fftypes.NewUUID(), BlockNumber: 2001}},
}
es.batchChannel <- &ffcapi.ListenerEvent{
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2001}, // this is a new event
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: listenerID, BlockNumber: 2001}},
Expand Down

0 comments on commit c982c2b

Please sign in to comment.