diff --git a/CHANGELOG.md b/CHANGELOG.md index 44b7a33..0ba9024 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.6.X] +- Update type names and docs for consistent naming convention (Note: there is no logic or method name change) + ## [1.5.X] - Fix Elixir `v1.7.x` warnings for string to atom conversions - Remove deprecated `EventBus.Util.String` module @@ -31,7 +34,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Delegate optional variables to optional library configuration when building/notifying events with Event builder - Add random id generator for Event builder - Introduce `fetch_event_data` function to fetch only event data -- Log empty topic listeners +- Log empty topic subscribers - Add missing tests for existence check - Update time spent calculation for EventSource block - Remove support for system event tracing (Updated the wiki to create wrapper for system event tracing) @@ -71,7 +74,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Add `source` attribute to increase traceability -- Add optional configuration to Subscriber to use the same module/function with different configurations to process the event. The aim of this change is increasing re-useability of the listener with several configurations. For example, this will allow writing an HTTP consumer or an AWS lambda caller function with different configurations. +- Add optional configuration to Subscriber to use the same module/function with different configurations to process the event. The aim of this change is increasing re-useability of the subscriber with several configurations. For example, this will allow writing an HTTP consumer or an AWS lambda caller function with different configurations. ### TODO diff --git a/README.md b/README.md index c881192..27ef470 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Traceable, extendable and minimalist event bus implementation for Elixir with bu - [Register/unregister event topics on demand](#registerunregister-event-topics-on-demand) -- [Subscribe to the 'event bus' with a listener and list of given topics](#subscribe-to-the-event-bus-with-a-listener-and-list-of-given-topics-notification-manager-will-match-with-regex) +- [Subscribe to the 'event bus' with a subscriber and list of given topics](#subscribe-to-the-event-bus-with-a-subscriber-and-list-of-given-topics-notification-manager-will-match-with-regex) - [Unsubscribe from the 'event bus'](#unsubscribe-from-the-event-bus) @@ -46,7 +46,7 @@ Traceable, extendable and minimalist event bus implementation for Elixir with bu - [Use block notifier to notify event data to given topic](#use-block-notifier-to-notify-event-data-to-given-topic) -[Sample Listener Implementation](#sample-listener-implementation) +[Sample Subscriber Implementation](#sample-subscriber-implementation) [Event Storage Details](#event-storage-details) @@ -139,45 +139,45 @@ EventBus.unregister_topic(:webhook_received) > :ok ``` -##### Subscribe to the 'event bus' with a listener and list of given topics, `Notification Manager` will match with Regex +##### Subscribe to the 'event bus' with a subscriber and list of given topics, `Notification Manager` will match with Regex ```elixir # to catch every event topic -EventBus.subscribe({MyEventListener, [".*"]}) +EventBus.subscribe({MyEventSubscriber, [".*"]}) > :ok # to catch specific topics -EventBus.subscribe({MyEventListener, ["purchase_", "booking_confirmed$", "flight_passed$"]}) +EventBus.subscribe({MyEventSubscriber, ["purchase_", "booking_confirmed$", "flight_passed$"]}) > :ok -# if your listener has a config +# if your subscriber has a config config = %{} -listener = {MyEventListener, config} -EventBus.subscribe({listener, [".*"]}) +subscriber = {MyEventSubscriber, config} +EventBus.subscribe({subscriber, [".*"]}) > :ok ``` ##### Unsubscribe from the 'event bus' ```elixir -EventBus.unsubscribe(MyEventListener) +EventBus.unsubscribe(MyEventSubscriber) > :ok -# if your listener has a config +# if your subscriber has a config config = %{} -EventBus.unsubscribe({MyEventListener, config}) +EventBus.unsubscribe({MyEventSubscriber, config}) > :ok ``` ##### List subscribers ```elixir EventBus.subscribers() -> [{MyEventListener, [".*"]}, {{AnotherListener, %{}}, [".*"]}] +> [{MyEventSubscriber, [".*"]}, {{AnotherSubscriber, %{}}, [".*"]}] ``` ##### List subscribers of a specific event ```elixir EventBus.subscribers(:hello_received) -> [MyEventListener, {{AnotherListener, %{}}}] +> [MyEventSubscriber, {{AnotherSubscriber, %{}}}] ``` ##### Event data structure @@ -248,19 +248,19 @@ EventBus.fetch_event_data({topic, id}) ##### Mark as completed on Event Observation Manager ```elixir -listener = MyEventListener -# If your listener has config then pass tuple -listener = {MyEventListener, config} -EventBus.mark_as_completed({listener, {:bye_received, id}}) +subscriber = MyEventSubscriber +# If your subscriber has config then pass tuple +subscriber = {MyEventSubscriber, config} +EventBus.mark_as_completed({subscriber, {:bye_received, id}}) > :ok ``` ##### Mark as skipped on Event Observation Manager ```elixir -listener = MyEventListener -# If your listener has config then pass tuple -listener = {MyEventListener, config} -EventBus.mark_as_skipped({listener, {:bye_received, id}}) +subscriber = MyEventSubscriber +# If your subscriber has config then pass tuple +subscriber = {MyEventSubscriber, config} +EventBus.mark_as_skipped({subscriber, {:bye_received, id}}) > :ok ``` @@ -352,13 +352,13 @@ end > %{email: "mrsjd@example.com", name: "Mrs Jane Doe"} ``` -### Sample Listener Implementation +### Sample Subscriber Implementation ```elixir -defmodule MyEventListener do +defmodule MyEventSubscriber do ... - # if your listener does not have a config + # if your subscriber does not have a config def process({topic, id} = event_shadow) do GenServer.cast(__MODULE__, event_shadow) :ok @@ -366,7 +366,7 @@ defmodule MyEventListener do ... - # if your listener has a config + # if your subscriber has a config def process({config, topic, id} = event_shadow_with_conf) do GenServer.cast(__MODULE__, event_shadow_with_conf) :ok @@ -375,7 +375,7 @@ defmodule MyEventListener do ... - # if your listener does not have a config + # if your subscriber does not have a config def handle_cast({:bye_received, id} = event_shadow, state) do event = EventBus.fetch_event(event_shadow) # do sth with event @@ -413,14 +413,14 @@ defmodule MyEventListener do ... - # if your listener has a config + # if your subscriber has a config def handle_cast({config, :bye_received, id}, state) do event = EventBus.fetch_event({:bye_received, id}) # do sth with event # update the watcher! - listener = {__MODULE__, config} - EventBus.mark_as_completed({listener, :bye_received, id}) + subscriber = {__MODULE__, config} + EventBus.mark_as_completed({subscriber, :bye_received, id}) ... {:noreply, state} end @@ -430,15 +430,15 @@ defmodule MyEventListener do # do sth with EventBus.Model.Event # update the watcher! - listener = {__MODULE__, config} - EventBus.mark_as_completed({listener, :hello_received, id}) + subscriber = {__MODULE__, config} + EventBus.mark_as_completed({subscriber, :hello_received, id}) ... {:noreply, state} end def handle_cast({config, topic, id}, state) do - listener = {__MODULE__, config} - EventBus.mark_as_skipped({listener, topic, id}) + subscriber = {__MODULE__, config} + EventBus.mark_as_skipped({subscriber, topic, id}) {:noreply, state} end @@ -452,7 +452,7 @@ When an event configured in `config` file, 2 ETS tables will be created for the All event data is temporarily saved to the ETS tables with the name `:eb_es_<>` until all subscribers processed the data. This table is a read heavy table. When a subscriber needs to process the event data, it queries this table to fetch event data. -To watch event status, a separate watcher table is created for each event type with the name `:eb_ew_<>`. This table is used for keeping the status of the event. `Observation Manager` updates this table frequently with the notification of the event listeners/subscribers. +To watch event status, a separate watcher table is created for each event type with the name `:eb_ew_<>`. This table is used for keeping the status of the event. `Observation Manager` updates this table frequently with the notification of the event subscribers. When all subscribers process the event data, data in the event store and watcher, automatically deleted by the `Observation Manager`. If you need to see the status of unprocessed events, event watcher table is one of the good places to query. @@ -523,7 +523,7 @@ A few sample addons listed below. Please do not hesitate to add your own addon t | Addon Name | Description | Link | Docs | | -------------------- | ------------- | ------------- | ------------- | | `event_bus_postgres` | Fast event consumer to persist `event_bus` events to Postgres using GenStage | [Github](https://github.com/otobus/event_bus_postgres) | [HexDocs](https://hexdocs.pm/event_bus_postgres) | -| `event_bus_logger` | Deadly simple log listener implementation | [Github](https://github.com/otobus/event_bus_logger) | [HexDocs](https://hexdocs.pm/event_bus_logger) | +| `event_bus_logger` | Deadly simple log subscriber implementation | [Github](https://github.com/otobus/event_bus_logger) | [HexDocs](https://hexdocs.pm/event_bus_logger) | | `event_bus_metrics` | Metrics UI and metrics API endpoints for EventBus events for debugging and monitoring | [Hex](https://hex.pm/packages/event_bus_metrics) | [HexDocs](https://hexdocs.pm/event_bus_metrics) | Note: The addons under [https://github.com/otobus](https://github.com/otobus) organization implemented as a sample, but feel free to use them in your project with respecting their licenses. diff --git a/lib/event_bus.ex b/lib/event_bus.ex index 96354bd..fded8b8 100644 --- a/lib/event_bus.ex +++ b/lib/event_bus.ex @@ -23,48 +23,49 @@ defmodule EventBus do @typedoc "Tuple of topic name and event id" @type event_shadow :: {topic(), event_id()} - @typedoc "Event listener" - @type listener :: listener_without_config() | listener_with_config() + @typedoc "Event subscriber" + @type subscriber :: subscriber_without_config() | subscriber_with_config() - @typedoc "Listener configuration" - @type listener_config :: any() + @typedoc "Subscriber configuration" + @type subscriber_config :: any() - @typedoc "List of event listeners" - @type listener_list :: list(listener()) + @typedoc "List of event subscribers" + @type subscribers :: list(subscriber()) - @typedoc "Event listener with config" - @type listener_with_config :: {module(), listener_config()} + @typedoc "Event subscriber with config" + @type subscriber_with_config :: {module(), subscriber_config()} - @typedoc "Tuple of listener and event reference" - @type listener_with_event_ref :: - listener_with_event_shadow() | listener_with_topic_and_event_id() + @typedoc "Tuple of subscriber and event reference" + @type subscriber_with_event_ref :: + subscriber_with_event_shadow() | subscriber_with_topic_and_event_id() - @typedoc "Tuple of listener and event shadow" - @type listener_with_event_shadow :: {listener(), event_shadow()} + @typedoc "Tuple of subscriber and event shadow" + @type subscriber_with_event_shadow :: {subscriber(), event_shadow()} - @typedoc "Tuple of listener, topic and event id" - @type listener_with_topic_and_event_id :: {listener(), topic(), event_id()} + @typedoc "Tuple of subscriber, topic and event id" + @type subscriber_with_topic_and_event_id :: + {subscriber(), topic(), event_id()} - @typedoc "Tuple of listener and list of topic patterns" - @type listener_with_topic_patterns :: {listener(), topic_pattern_list()} + @typedoc "Tuple of subscriber and list of topic patterns" + @type subscriber_with_topic_patterns :: {subscriber(), topic_patterns()} - @typedoc "Event listener without config" - @type listener_without_config :: module() + @typedoc "Event subscriber without config" + @type subscriber_without_config :: module() @typedoc "Topic name" @type topic :: atom() @typedoc "List of topic names" - @type topic_list :: list(topic()) + @type topics :: list(topic()) @typedoc "Regex pattern to match topic name" @type topic_pattern :: String.t() @typedoc "List of topic patterns" - @type topic_pattern_list :: list(topic_pattern()) + @type topic_patterns :: list(topic_pattern()) @doc """ - Send an event to all subscribers(listeners) + Send an event to all subscribers ## Examples @@ -101,7 +102,7 @@ defmodule EventBus do EventBus.topics() [:metrics_summed] """ - @spec topics() :: topic_list() + @spec topics() :: topics() defdelegate topics, to: Topic, as: :all @@ -135,64 +136,64 @@ defmodule EventBus do as: :unregister @doc """ - Subscribe a listener to the event bus + Subscribe a subscriber to the event bus ## Examples - EventBus.subscribe({MyEventListener, [".*"]}) + EventBus.subscribe({MyEventSubscriber, [".*"]}) :ok - # For configurable listeners you can pass tuple of listener and config + # For configurable subscribers you can pass tuple of subscriber and config my_config = %{} - EventBus.subscribe({{OtherListener, my_config}, [".*"]}) + EventBus.subscribe({{OtherSubscriber, my_config}, [".*"]}) :ok """ - @spec subscribe(listener_with_topic_patterns()) :: :ok - defdelegate subscribe(listener_with_topic_patterns), + @spec subscribe(subscriber_with_topic_patterns()) :: :ok + defdelegate subscribe(subscriber_with_topic_patterns), to: Subscription, as: :subscribe @doc """ - Unsubscribe a listener from the event bus + Unsubscribe a subscriber from the event bus ## Examples - EventBus.unsubscribe(MyEventListener) + EventBus.unsubscribe(MyEventSubscriber) :ok - # For configurable listeners you must pass tuple of listener and config + # For configurable subscribers you must pass tuple of subscriber and config my_config = %{} - EventBus.unsubscribe({OtherListener, my_config}) + EventBus.unsubscribe({OtherSubscriber, my_config}) :ok """ - @spec unsubscribe(listener()) :: :ok - defdelegate unsubscribe(listener), + @spec unsubscribe(subscriber()) :: :ok + defdelegate unsubscribe(subscriber), to: Subscription, as: :unsubscribe @doc """ - Check if the given listener subscribed to the event bus for the given topic + Check if the given subscriber subscribed to the event bus for the given topic patterns ## Examples - EventBus.subscribe({MyEventListener, [".*"]}) + EventBus.subscribe({MyEventSubscriber, [".*"]}) :ok - EventBus.subscribed?({MyEventListener, [".*"]}) + EventBus.subscribed?({MyEventSubscriber, [".*"]}) true - EventBus.subscribed?({MyEventListener, ["some_initialized"]}) + EventBus.subscribed?({MyEventSubscriber, ["some_initialized"]}) false - EventBus.subscribed?({AnothEventListener, [".*"]}) + EventBus.subscribed?({AnothEventSubscriber, [".*"]}) false """ - @spec subscribed?(listener_with_topic_patterns()) :: boolean() - defdelegate subscribed?(listener_with_topic_patterns), + @spec subscribed?(subscriber_with_topic_patterns()) :: boolean() + defdelegate subscribed?(subscriber_with_topic_patterns), to: Subscription, as: :subscribed? @@ -202,14 +203,14 @@ defmodule EventBus do ## Examples EventBus.subscribers() - [MyEventListener] + [MyEventSubscriber] - # One usual and one configured listener with its config + # One usual and one configured subscriber with its config EventBus.subscribers() - [MyEventListener, {OtherListener, %{}}] + [MyEventSubscriber, {OtherSubscriber, %{}}] """ - @spec subscribers() :: listener_list() + @spec subscribers() :: subscribers() defdelegate subscribers, to: Subscription, as: :subscribers @@ -220,14 +221,14 @@ defmodule EventBus do ## Examples EventBus.subscribers(:metrics_received) - [MyEventListener] + [MyEventSubscriber] - # One usual and one configured listener with its config + # One usual and one configured subscriber with its config EventBus.subscribers(:metrics_received) - [MyEventListener, {OtherListener, %{}}] + [MyEventSubscriber, {OtherSubscriber, %{}}] """ - @spec subscribers(topic()) :: listener_list() + @spec subscribers(topic()) :: subscribers() defdelegate subscribers(topic), to: Subscription, as: :subscribers @@ -260,45 +261,45 @@ defmodule EventBus do as: :fetch_data @doc """ - Mark the event as completed for the listener + Mark the event as completed for the subscriber ## Examples topic = :hello_received event_id = "124" event_shadow = {topic, event_id} - # For regular listeners - EventBus.mark_as_completed({MyEventListener, event_shadow}) + # For regular subscribers + EventBus.mark_as_completed({MyEventSubscriber, event_shadow}) - # For configurable listeners you must pass tuple of listener and config + # For configurable subscribers you must pass tuple of subscriber and config my_config = %{} - listener = {OtherListener, my_config} + subscriber = {OtherSubscriber, my_config} - EventBus.mark_as_completed({listener, event_shadow}) + EventBus.mark_as_completed({subscriber, event_shadow}) :ok """ - @spec mark_as_completed(listener_with_event_ref()) :: :ok - defdelegate mark_as_completed(listener_with_event_ref), + @spec mark_as_completed(subscriber_with_event_ref()) :: :ok + defdelegate mark_as_completed(subscriber_with_event_ref), to: Observation, as: :mark_as_completed @doc """ - Mark the event as skipped for the listener + Mark the event as skipped for the subscriber ## Examples - EventBus.mark_as_skipped({MyEventListener, {:unmatched_occurred, "124"}}) + EventBus.mark_as_skipped({MyEventSubscriber, {:unmatched_occurred, "124"}}) - # For configurable listeners you must pass tuple of listener and config + # For configurable subscribers you must pass tuple of subscriber and config my_config = %{} - listener = {OtherListener, my_config} - EventBus.mark_as_skipped({listener, {:unmatched_occurred, "124"}}) + subscriber = {OtherSubscriber, my_config} + EventBus.mark_as_skipped({subscriber, {:unmatched_occurred, "124"}}) :ok """ - @spec mark_as_skipped(listener_with_event_ref()) :: :ok - defdelegate mark_as_skipped(listener_with_event_ref), + @spec mark_as_skipped(subscriber_with_event_ref()) :: :ok + defdelegate mark_as_skipped(subscriber_with_event_ref), to: Observation, as: :mark_as_skipped end diff --git a/lib/event_bus/managers/notification.ex b/lib/event_bus/managers/notification.ex index 6c1886e..c078793 100644 --- a/lib/event_bus/managers/notification.ex +++ b/lib/event_bus/managers/notification.ex @@ -3,7 +3,7 @@ defmodule EventBus.Manager.Notification do ########################################################################### # Notification is responsible for saving events, creating event watcher and - # delivering events to listeners. + # delivering events to subscribers. ########################################################################### use GenServer @@ -26,7 +26,7 @@ defmodule EventBus.Manager.Notification do end @doc """ - Notify event to event.topic listeners in the current node + Notify event to event.topic subscribers in the current node """ @spec notify(event()) :: :ok def notify(%Event{} = event) do diff --git a/lib/event_bus/managers/observation.ex b/lib/event_bus/managers/observation.ex index 1ad8685..b700b0a 100644 --- a/lib/event_bus/managers/observation.ex +++ b/lib/event_bus/managers/observation.ex @@ -4,7 +4,7 @@ defmodule EventBus.Manager.Observation do ########################################################################### # Event Observation module is a helper to get info for the events and also an # organizer for the events happened in time. It automatically deletes - # processed events from the ETS table. Event listeners are responsible for + # processed events from the ETS table. Event subscribers are responsible for # notifying the Event Observation on completions and skips. ########################################################################### @@ -13,11 +13,11 @@ defmodule EventBus.Manager.Observation do alias EventBus.Service.Observation, as: ObservationService @typep event_shadow :: EventBus.event_shadow() - @typep listener_list :: EventBus.listener_list() - @typep listener_list_with_event_shadow :: {listener_list(), event_shadow()} - @typep listener_with_event_ref :: EventBus.listener_with_event_ref() + @typep subscribers :: EventBus.subscribers() + @typep subscribers_with_event_shadow :: {subscribers(), event_shadow()} + @typep subscriber_with_event_ref :: EventBus.subscriber_with_event_ref() @typep topic :: EventBus.topic() - @typep watcher :: {listener_list(), listener_list(), listener_list()} + @typep watcher :: {subscribers(), subscribers(), subscribers()} @backend ObservationService @@ -60,33 +60,33 @@ defmodule EventBus.Manager.Observation do @doc """ Mark event as completed on the watcher """ - @spec mark_as_completed(listener_with_event_ref()) :: :ok - def mark_as_completed({listener, topic, id}) do - GenServer.cast(__MODULE__, {:mark_as_completed, {listener, {topic, id}}}) + @spec mark_as_completed(subscriber_with_event_ref()) :: :ok + def mark_as_completed({subscriber, topic, id}) do + GenServer.cast(__MODULE__, {:mark_as_completed, {subscriber, {topic, id}}}) end - def mark_as_completed({listener, {topic, id}}) do - GenServer.cast(__MODULE__, {:mark_as_completed, {listener, {topic, id}}}) + def mark_as_completed({subscriber, {topic, id}}) do + GenServer.cast(__MODULE__, {:mark_as_completed, {subscriber, {topic, id}}}) end @doc """ Mark event as skipped on the watcher """ - @spec mark_as_skipped(listener_with_event_ref()) :: :ok - def mark_as_skipped({listener, topic, id}) do - GenServer.cast(__MODULE__, {:mark_as_skipped, {listener, {topic, id}}}) + @spec mark_as_skipped(subscriber_with_event_ref()) :: :ok + def mark_as_skipped({subscriber, topic, id}) do + GenServer.cast(__MODULE__, {:mark_as_skipped, {subscriber, {topic, id}}}) end - def mark_as_skipped({listener, {topic, id}}) do - GenServer.cast(__MODULE__, {:mark_as_skipped, {listener, {topic, id}}}) + def mark_as_skipped({subscriber, {topic, id}}) do + GenServer.cast(__MODULE__, {:mark_as_skipped, {subscriber, {topic, id}}}) end @doc """ Create an watcher """ - @spec create(listener_list_with_event_shadow()) :: :ok - def create({listeners, {topic, id}}) do - GenServer.call(__MODULE__, {:save, {topic, id}, {listeners, [], []}}) + @spec create(subscribers_with_event_shadow()) :: :ok + def create({subscribers, {topic, id}}) do + GenServer.call(__MODULE__, {:save, {topic, id}, {subscribers, [], []}}) end ########################################################################### @@ -136,18 +136,18 @@ defmodule EventBus.Manager.Observation do end @doc false - @spec handle_cast({:mark_as_completed, listener_with_event_ref()}, term()) + @spec handle_cast({:mark_as_completed, subscriber_with_event_ref()}, term()) :: no_return() - def handle_cast({:mark_as_completed, {listener, {topic, id}}}, state) do - @backend.mark_as_completed({listener, {topic, id}}) + def handle_cast({:mark_as_completed, {subscriber, {topic, id}}}, state) do + @backend.mark_as_completed({subscriber, {topic, id}}) {:noreply, state} end @doc false - @spec handle_cast({:mark_as_skipped, listener_with_event_ref()}, term()) + @spec handle_cast({:mark_as_skipped, subscriber_with_event_ref()}, term()) :: no_return() - def handle_cast({:mark_as_skipped, {listener, {topic, id}}}, state) do - @backend.mark_as_skipped({listener, {topic, id}}) + def handle_cast({:mark_as_skipped, {subscriber, {topic, id}}}, state) do + @backend.mark_as_skipped({subscriber, {topic, id}}) {:noreply, state} end end diff --git a/lib/event_bus/managers/subscription.ex b/lib/event_bus/managers/subscription.ex index 4e57525..6cf4fb0 100644 --- a/lib/event_bus/managers/subscription.ex +++ b/lib/event_bus/managers/subscription.ex @@ -9,9 +9,9 @@ defmodule EventBus.Manager.Subscription do alias EventBus.Service.Subscription, as: SubscriptionService - @typep listener :: EventBus.listener() - @typep listener_list :: EventBus.listener_list() - @typep listener_with_topic_patterns :: EventBus.listener_with_topic_patterns() + @typep subscriber :: EventBus.subscriber() + @typep subscribers :: EventBus.subscribers() + @typep subscriber_with_topic_patterns :: EventBus.subscriber_with_topic_patterns() @typep topic :: EventBus.topic() @backend SubscriptionService @@ -27,31 +27,31 @@ defmodule EventBus.Manager.Subscription do end @doc """ - Does the listener subscribe to topic_patterns? + Does the subscriber subscribe to topic_patterns? """ - @spec subscribed?(listener_with_topic_patterns()) :: boolean() - def subscribed?({_listener, _topic_patterns} = subscriber) do + @spec subscribed?(subscriber_with_topic_patterns()) :: boolean() + def subscribed?({_subscriber, _topic_patterns} = subscriber) do GenServer.call(__MODULE__, {:subscribed?, subscriber}) end @doc """ - Subscribe the listener to topic_patterns + Subscribe the subscriber to topic_patterns """ - @spec subscribe(listener_with_topic_patterns()) :: :ok - def subscribe({listener, topic_patterns}) do - GenServer.cast(__MODULE__, {:subscribe, {listener, topic_patterns}}) + @spec subscribe(subscriber_with_topic_patterns()) :: :ok + def subscribe({subscriber, topic_patterns}) do + GenServer.cast(__MODULE__, {:subscribe, {subscriber, topic_patterns}}) end @doc """ - Unsubscribe the listener + Unsubscribe the subscriber """ - @spec unsubscribe(listener()) :: :ok - def unsubscribe(listener) do - GenServer.cast(__MODULE__, {:unsubscribe, listener}) + @spec unsubscribe(subscriber()) :: :ok + def unsubscribe(subscriber) do + GenServer.cast(__MODULE__, {:unsubscribe, subscriber}) end @doc """ - Set listeners to the topic + Set subscribers to the topic """ @spec register_topic(topic()) :: :ok def register_topic(topic) do @@ -59,7 +59,7 @@ defmodule EventBus.Manager.Subscription do end @doc """ - Unset listeners from the topic + Unset subscribers from the topic """ @spec unregister_topic(topic()) :: :ok def unregister_topic(topic) do @@ -71,17 +71,17 @@ defmodule EventBus.Manager.Subscription do ########################################################################### @doc """ - Fetch listeners + Fetch subscribers """ - @spec subscribers() :: listener_list() + @spec subscribers() :: subscribers() defdelegate subscribers, to: @backend, as: :subscribers @doc """ - Fetch listeners of the topic + Fetch subscribers of the topic """ - @spec subscribers(topic()) :: listener_list() + @spec subscribers(topic()) :: subscribers() defdelegate subscribers(topic), to: @backend, as: :subscribers @@ -91,24 +91,24 @@ defmodule EventBus.Manager.Subscription do ########################################################################### @doc false - @spec handle_call({:subscribed?, listener_with_topic_patterns()}, any(), term()) + @spec handle_call({:subscribed?, subscriber_with_topic_patterns()}, any(), term()) :: {:reply, boolean(), term()} def handle_call({:subscribed?, subscriber}, _from, state) do {:reply, @backend.subscribed?(subscriber), state} end @doc false - @spec handle_cast({:subscribe, listener_with_topic_patterns()}, term()) + @spec handle_cast({:subscribe, subscriber_with_topic_patterns()}, term()) :: no_return() - def handle_cast({:subscribe, {listener, topic_patterns}}, state) do - @backend.subscribe({listener, topic_patterns}) + def handle_cast({:subscribe, {subscriber, topic_patterns}}, state) do + @backend.subscribe({subscriber, topic_patterns}) {:noreply, state} end @doc false - @spec handle_cast({:unsubscribe, listener()}, term()) :: no_return() - def handle_cast({:unsubscribe, listener}, state) do - @backend.unsubscribe(listener) + @spec handle_cast({:unsubscribe, subscriber()}, term()) :: no_return() + def handle_cast({:unsubscribe, subscriber}, state) do + @backend.unsubscribe(subscriber) {:noreply, state} end diff --git a/lib/event_bus/managers/topic.ex b/lib/event_bus/managers/topic.ex index b60c712..8175d56 100644 --- a/lib/event_bus/managers/topic.ex +++ b/lib/event_bus/managers/topic.ex @@ -10,7 +10,7 @@ defmodule EventBus.Manager.Topic do alias EventBus.Service.Topic, as: TopicService @typep topic :: EventBus.topic() - @typep topic_list :: EventBus.topic_list() + @typep topics :: EventBus.topics() @backend TopicService @@ -57,7 +57,7 @@ defmodule EventBus.Manager.Topic do @doc """ List all registered topics """ - @spec all() :: topic_list() + @spec all() :: topics() defdelegate all, to: @backend, as: :all diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index 99f8bd9..2911fc6 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -10,50 +10,50 @@ defmodule EventBus.Service.Notification do @typep event :: EventBus.event() @typep event_shadow :: EventBus.event_shadow() - @typep listener :: EventBus.listener() - @typep listener_list :: EventBus.listener_list() + @typep subscriber :: EventBus.subscriber() + @typep subscribers :: EventBus.subscribers() @typep topic :: EventBus.topic() @doc false @spec notify(event()) :: :ok def notify(%Event{id: id, topic: topic} = event) do - listeners = SubscriptionManager.subscribers(topic) + subscribers = SubscriptionManager.subscribers(topic) - if listeners == [] do + if subscribers == [] do warn_missing_topic_subscription(topic) else :ok = StoreManager.create(event) - :ok = ObservationManager.create({listeners, {topic, id}}) + :ok = ObservationManager.create({subscribers, {topic, id}}) - notify_listeners(listeners, {topic, id}) + notify_subscribers(subscribers, {topic, id}) end :ok end - @spec notify_listeners(listener_list(), event_shadow()) :: :ok - defp notify_listeners(listeners, event_shadow) do - Enum.each(listeners, fn listener -> - notify_listener(listener, event_shadow) + @spec notify_subscribers(subscribers(), event_shadow()) :: :ok + defp notify_subscribers(subscribers, event_shadow) do + Enum.each(subscribers, fn subscriber -> + notify_subscriber(subscriber, event_shadow) end) :ok end - @spec notify_listener(listener(), event_shadow()) :: no_return() - defp notify_listener({listener, config}, {topic, id}) do - listener.process({config, topic, id}) + @spec notify_subscriber(subscriber(), event_shadow()) :: no_return() + defp notify_subscriber({subscriber, config}, {topic, id}) do + subscriber.process({config, topic, id}) rescue error -> - log_error(listener, error) - ObservationManager.mark_as_skipped({{listener, config}, {topic, id}}) + log_error(subscriber, error) + ObservationManager.mark_as_skipped({{subscriber, config}, {topic, id}}) end - defp notify_listener(listener, {topic, id}) do - listener.process({topic, id}) + defp notify_subscriber(subscriber, {topic, id}) do + subscriber.process({topic, id}) rescue error -> - log_error(listener, error) - ObservationManager.mark_as_skipped({listener, {topic, id}}) + log_error(subscriber, error) + ObservationManager.mark_as_skipped({subscriber, {topic, id}}) end @spec registration_status(topic()) :: String.t() @@ -70,8 +70,8 @@ defmodule EventBus.Service.Notification do end @spec log_error(module(), any()) :: no_return() - defp log_error(listener, error) do - msg = "#{listener}.process/1 raised an error!\n#{inspect(error)}" + defp log_error(subscriber, error) do + msg = "#{subscriber}.process/1 raised an error!\n#{inspect(error)}" Logger.info(msg) end end diff --git a/lib/event_bus/services/observation.ex b/lib/event_bus/services/observation.ex index b5ca18f..d7258b1 100644 --- a/lib/event_bus/services/observation.ex +++ b/lib/event_bus/services/observation.ex @@ -5,10 +5,10 @@ defmodule EventBus.Service.Observation do alias :ets, as: Ets @typep event_shadow :: EventBus.event_shadow() - @typep listener_list :: EventBus.listener_list() - @typep listener_with_event_ref :: EventBus.listener_with_event_ref() + @typep subscribers :: EventBus.subscribers() + @typep subscriber_with_event_ref :: EventBus.subscriber_with_event_ref() @typep topic :: EventBus.topic() - @typep watcher :: {listener_list(), listener_list(), listener_list()} + @typep watcher :: {subscribers(), subscribers(), subscribers()} @ets_opts [ :set, @@ -42,17 +42,17 @@ defmodule EventBus.Service.Observation do end @doc false - @spec mark_as_completed(listener_with_event_ref()) :: :ok - def mark_as_completed({listener, event_shadow}) do - {listeners, completers, skippers} = fetch(event_shadow) - save_or_delete(event_shadow, {listeners, [listener | completers], skippers}) + @spec mark_as_completed(subscriber_with_event_ref()) :: :ok + def mark_as_completed({subscriber, event_shadow}) do + {subscribers, completers, skippers} = fetch(event_shadow) + save_or_delete(event_shadow, {subscribers, [subscriber | completers], skippers}) end @doc false - @spec mark_as_skipped(listener_with_event_ref()) :: :ok - def mark_as_skipped({listener, event_shadow}) do - {listeners, completers, skippers} = fetch(event_shadow) - save_or_delete(event_shadow, {listeners, completers, [listener | skippers]}) + @spec mark_as_skipped(subscriber_with_event_ref()) :: :ok + def mark_as_skipped({subscriber, event_shadow}) do + {subscribers, completers, skippers} = fetch(event_shadow) + save_or_delete(event_shadow, {subscribers, completers, [subscriber | skippers]}) end @doc false @@ -72,8 +72,8 @@ defmodule EventBus.Service.Observation do end @spec complete?(watcher()) :: boolean() - defp complete?({listeners, completers, skippers}) do - length(listeners) == length(completers) + length(skippers) + defp complete?({subscribers, completers, skippers}) do + length(subscribers) == length(completers) + length(skippers) end @spec save_or_delete(event_shadow(), watcher()) :: :ok diff --git a/lib/event_bus/services/subscription.ex b/lib/event_bus/services/subscription.ex index bdf6003..722fdc0 100644 --- a/lib/event_bus/services/subscription.ex +++ b/lib/event_bus/services/subscription.ex @@ -7,103 +7,103 @@ defmodule EventBus.Service.Subscription do @app :event_bus @namespace :subscriptions - @typep listener :: EventBus.listener() - @typep listener_list :: EventBus.listener_list() - @typep listener_with_topic_patterns :: EventBus.listener_with_topic_patterns() + @typep subscriber :: EventBus.subscriber() + @typep subscribers :: EventBus.subscribers() + @typep subscriber_with_topic_patterns :: EventBus.subscriber_with_topic_patterns() @typep topic :: EventBus.topic() - @spec subscribed?(listener_with_topic_patterns()) :: boolean() + @spec subscribed?(subscriber_with_topic_patterns()) :: boolean() def subscribed?(subscriber) do Enum.member?(subscribers(), subscriber) end @doc false - @spec subscribe(listener_with_topic_patterns()) :: :ok - def subscribe({listener, topics}) do - {listeners, topic_map} = load_state() - listeners = add_or_update_listener(listeners, {listener, topics}) + @spec subscribe(subscriber_with_topic_patterns()) :: :ok + def subscribe({subscriber, topics}) do + {subscribers, topic_map} = load_state() + subscribers = add_or_update_subscriber(subscribers, {subscriber, topics}) topic_map = topic_map - |> add_listener_to_topic_map({listener, topics}) + |> add_subscriber_to_topic_map({subscriber, topics}) |> Enum.into(%{}) - save_state({listeners, topic_map}) + save_state({subscribers, topic_map}) end @doc false - @spec unsubscribe(listener()) :: :ok - def unsubscribe(listener) do - {listeners, topic_map} = load_state() - listeners = List.keydelete(listeners, listener, 0) + @spec unsubscribe(subscriber()) :: :ok + def unsubscribe(subscriber) do + {subscribers, topic_map} = load_state() + subscribers = List.keydelete(subscribers, subscriber, 0) topic_map = topic_map - |> remove_listener_from_topic_map(listener) + |> remove_subscriber_from_topic_map(subscriber) |> Enum.into(%{}) - save_state({listeners, topic_map}) + save_state({subscribers, topic_map}) end @doc false @spec register_topic(topic()) :: :ok def register_topic(topic) do - {listeners, topic_map} = load_state() - topic_listeners = topic_listeners(listeners, topic) + {subscribers, topic_map} = load_state() + topic_subscribers = topic_subscribers(subscribers, topic) - save_state({listeners, Map.put(topic_map, topic, topic_listeners)}) + save_state({subscribers, Map.put(topic_map, topic, topic_subscribers)}) end @doc false @spec unregister_topic(topic()) :: :ok def unregister_topic(topic) do - {listeners, topic_map} = load_state() - save_state({listeners, Map.drop(topic_map, [topic])}) + {subscribers, topic_map} = load_state() + save_state({subscribers, Map.drop(topic_map, [topic])}) end @doc false - @spec subscribers() :: listener_list() + @spec subscribers() :: subscribers() def subscribers do - {listeners, _topic_map} = load_state() - listeners + {subscribers, _topic_map} = load_state() + subscribers end - @spec subscribers(topic()) :: listener_list() + @spec subscribers(topic()) :: subscribers() def subscribers(topic) do - {_listeners, topic_map} = load_state() + {_subscribers, topic_map} = load_state() topic_map[topic] || [] end - defp topic_listeners(listeners, topic) do - Enum.reduce(listeners, [], fn {listener, topics}, acc -> - if RegexUtil.superset?(topics, topic), do: [listener | acc], else: acc + defp topic_subscribers(subscribers, topic) do + Enum.reduce(subscribers, [], fn {subscriber, topics}, acc -> + if RegexUtil.superset?(topics, topic), do: [subscriber | acc], else: acc end) end - defp remove_listener_from_topic_map(topic_map, listener) do - Enum.map(topic_map, fn {topic, topic_listeners} -> - topic_listeners = List.delete(topic_listeners, listener) - {topic, topic_listeners} + defp remove_subscriber_from_topic_map(topic_map, subscriber) do + Enum.map(topic_map, fn {topic, topic_subscribers} -> + topic_subscribers = List.delete(topic_subscribers, subscriber) + {topic, topic_subscribers} end) end - defp add_listener_to_topic_map(topic_map, {listener, topics}) do - Enum.map(topic_map, fn {topic, topic_listeners} -> - topic_listeners = List.delete(topic_listeners, listener) + defp add_subscriber_to_topic_map(topic_map, {subscriber, topics}) do + Enum.map(topic_map, fn {topic, topic_subscribers} -> + topic_subscribers = List.delete(topic_subscribers, subscriber) if RegexUtil.superset?(topics, topic) do - {topic, [listener | topic_listeners]} + {topic, [subscriber | topic_subscribers]} else - {topic, topic_listeners} + {topic, topic_subscribers} end end) end - defp add_or_update_listener(listeners, {listener, topics}) do - if List.keymember?(listeners, listener, 0) do - List.keyreplace(listeners, listener, 0, {listener, topics}) + defp add_or_update_subscriber(subscribers, {subscriber, topics}) do + if List.keymember?(subscribers, subscriber, 0) do + List.keyreplace(subscribers, subscriber, 0, {subscriber, topics}) else - [{listener, topics} | listeners] + [{subscriber, topics} | subscribers] end end diff --git a/lib/event_bus/services/topic.ex b/lib/event_bus/services/topic.ex index f4376f7..51c3f9a 100644 --- a/lib/event_bus/services/topic.ex +++ b/lib/event_bus/services/topic.ex @@ -6,14 +6,14 @@ defmodule EventBus.Service.Topic do alias EventBus.Manager.Subscription, as: SubscriptionManager @typep topic :: EventBus.topic() - @typep topic_list :: EventBus.topic_list() + @typep topics :: EventBus.topics() @app :event_bus @namespace :topics @modules [StoreManager, SubscriptionManager, ObservationManager] @doc false - @spec all() :: topic_list() + @spec all() :: topics() def all do Application.get_env(:event_bus, :topics, []) end diff --git a/test/event_bus/managers/observation_test.exs b/test/event_bus/managers/observation_test.exs index fe1e222..c0ef347 100644 --- a/test/event_bus/managers/observation_test.exs +++ b/test/event_bus/managers/observation_test.exs @@ -37,7 +37,7 @@ defmodule EventBus.Manager.ObservationTest do topic = :some_event_occurred1 id = "E1" - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -46,14 +46,14 @@ defmodule EventBus.Manager.ObservationTest do Observation.register_topic(topic) - assert :ok == Observation.create({listeners, {topic, id}}) + assert :ok == Observation.create({subscribers, {topic, id}}) end test "complete" do topic = :some_event_occurred2 id = "E1" - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -61,23 +61,23 @@ defmodule EventBus.Manager.ObservationTest do ] Observation.register_topic(topic) - Observation.create({listeners, {topic, id}}) + Observation.create({subscribers, {topic, id}}) - listener = {InputLogger, %{}} - another_listener = {Calculator, %{}} + subscriber = {InputLogger, %{}} + another_subscriber = {Calculator, %{}} # With an event_shadow tuple - assert :ok === Observation.mark_as_completed({listener, {topic, id}}) + assert :ok === Observation.mark_as_completed({subscriber, {topic, id}}) # With an open tuple - assert :ok === Observation.mark_as_completed({another_listener, topic, id}) + assert :ok === Observation.mark_as_completed({another_subscriber, topic, id}) end test "skip" do topic = :some_event_occurred3 id = "E1" - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -85,15 +85,15 @@ defmodule EventBus.Manager.ObservationTest do ] Observation.register_topic(topic) - Observation.create({listeners, {topic, id}}) + Observation.create({subscribers, {topic, id}}) - listener = {InputLogger, %{}} - another_listener = {Calculator, %{}} + subscriber = {InputLogger, %{}} + another_subscriber = {Calculator, %{}} # With an event_shadow tuple - assert :ok == Observation.mark_as_skipped({listener, {topic, id}}) + assert :ok == Observation.mark_as_skipped({subscriber, {topic, id}}) # With an open tuple - assert :ok == Observation.mark_as_skipped({another_listener, topic, id}) + assert :ok == Observation.mark_as_skipped({another_subscriber, topic, id}) end end diff --git a/test/event_bus/services/notification_test.exs b/test/event_bus/services/notification_test.exs index 1d787a8..fe8f974 100644 --- a/test/event_bus/services/notification_test.exs +++ b/test/event_bus/services/notification_test.exs @@ -51,7 +51,7 @@ defmodule EventBus.Service.NotificationTest do EventBus.subscribe({{Calculator, %{}}, ["metrics_received$"]}) EventBus.subscribe({{MemoryLeakerOne, %{}}, [".*"]}) - # This listener deos not have a config!!! + # This subscriber deos not have a config!!! EventBus.subscribe({AnotherCalculator, ["metrics_received$"]}) # Sleep until subscriptions complete diff --git a/test/event_bus/services/observation_test.exs b/test/event_bus/services/observation_test.exs index 8ba772e..892e18b 100644 --- a/test/event_bus/services/observation_test.exs +++ b/test/event_bus/services/observation_test.exs @@ -50,7 +50,7 @@ defmodule EventBus.Service.ObservationTest do topic = :some_event_occurred1 id = "E1" - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -58,16 +58,16 @@ defmodule EventBus.Service.ObservationTest do ] Observation.register_topic(topic) - Observation.save({topic, id}, {listeners, [], []}) + Observation.save({topic, id}, {subscribers, [], []}) - assert {listeners, [], []} == Observation.fetch({topic, id}) + assert {subscribers, [], []} == Observation.fetch({topic, id}) end test "complete" do topic = :some_event_occurred2 id = "E1" - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -75,17 +75,17 @@ defmodule EventBus.Service.ObservationTest do ] Observation.register_topic(topic) - Observation.save({topic, id}, {listeners, [], []}) + Observation.save({topic, id}, {subscribers, [], []}) Observation.mark_as_completed({{InputLogger, %{}}, {topic, id}}) - assert {listeners, [{InputLogger, %{}}], []} == Observation.fetch({topic, id}) + assert {subscribers, [{InputLogger, %{}}], []} == Observation.fetch({topic, id}) end test "skip" do id = "E1" topic = :some_event_occurred3 - listeners = [ + subscribers = [ {InputLogger, %{}}, {Calculator, %{}}, {MemoryLeakerOne, %{}}, @@ -93,9 +93,9 @@ defmodule EventBus.Service.ObservationTest do ] Observation.register_topic(topic) - Observation.save({topic, id}, {listeners, [], []}) + Observation.save({topic, id}, {subscribers, [], []}) Observation.mark_as_skipped({{InputLogger, %{}}, {topic, id}}) - assert {listeners, [], [{InputLogger, %{}}]} == Observation.fetch({topic, id}) + assert {subscribers, [], [{InputLogger, %{}}]} == Observation.fetch({topic, id}) end end diff --git a/test/event_bus/services/subscription_test.exs b/test/event_bus/services/subscription_test.exs index bf9b010..84a998d 100644 --- a/test/event_bus/services/subscription_test.exs +++ b/test/event_bus/services/subscription_test.exs @@ -54,7 +54,7 @@ defmodule EventBus.Service.SubscriptionTest do ] == Subscription.subscribers() end - test "does not subscribe same listener" do + test "does not subscribe same subscriber" do Subscription.subscribe({{InputLogger, %{}}, [".*"]}) Subscription.subscribe({{InputLogger, %{}}, [".*"]}) Subscription.subscribe({{InputLogger, %{}}, [".*"]}) diff --git a/test/event_bus_test.exs b/test/event_bus_test.exs index c607ebe..d058528 100644 --- a/test/event_bus_test.exs +++ b/test/event_bus_test.exs @@ -33,13 +33,13 @@ defmodule EventBusTest do EventBus.subscribe({{BadOne, %{}}, [".*"]}) EventBus.subscribe({{Calculator, %{}}, ["metrics_received"]}) EventBus.subscribe({{MemoryLeakerOne, %{}}, [".*"]}) - # Wait until the listeners subscribe to the topics + # Wait until the subscribers subscribe to the topics Process.sleep(100) logs = capture_log(fn -> EventBus.notify(@event) - # Wait until the listeners process the event + # Wait until the subscribers process the event Process.sleep(300) end) diff --git a/test/support/helper.ex b/test/support/helper.ex index 312ab41..c8b3f5c 100644 --- a/test/support/helper.ex +++ b/test/support/helper.ex @@ -118,19 +118,19 @@ defmodule EventBus.Support.Helper do defmodule BadOne do @moduledoc """ - A bad listener implementation with wrong arity + A bad subscriber implementation with wrong arity All events will be marked as skipped """ @doc false def process(_, _) do - # it has wrong arity, can't be a listener + # it has wrong arity, can't be a subscriber end end defmodule AnotherBadOne do @moduledoc """ - A bad listener implementation with error raising + A bad subscriber implementation with error raising If the process raise an error the the event will be marked as skipped """