Skip to content

Commit

Permalink
fix: close breakingError channel when no consumers left
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Riobo Lorenzo <[email protected]>
  • Loading branch information
adrianriobo committed Dec 5, 2023
1 parent b5a3660 commit ab2df60
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/services/messaging/umb/umb.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func (umb *umb) subscribeTopic(subscriptionID, topic string, handlers []api.Mess
active: true}
umb.consumers.Add(1)
go consume(umb.subscriptions[subscriptionID], umb.breakingError)
// Close the error channel when no consumers left
go func() {
umb.consumers.Wait()
close(umb.breakingError)
}()
return nil
}

Expand Down Expand Up @@ -172,7 +177,6 @@ func consume(subscription *subscription, breakingError chan string) {
if contains {
// Send cause for reconnect
breakingError <- fmt.Sprintf("%v on topic %s", err.Error(), subscription.topic)
defer close(_umb.breakingError)
}
logging.Errorf("Error reading from topic: %s. %s", subscription.topic, err)
break
Expand Down

0 comments on commit ab2df60

Please sign in to comment.