diff --git a/CHANGELOG.md b/CHANGELOG.md index f859635..c95b3be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.4.X] + +- Add public types to main module to increase type safety and readability +- Remove allowence of passing string on topic registration/deregistration +- Allow passing `event_shadow` to `mark_as_completed/1` and `mark_as_skipped/1` + ## [1.3.X] - Set default transaction to the id diff --git a/README.md b/README.md index 22ca082..7f4a2e7 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,7 @@ EventBus.fetch_event_data({topic, id}) listener = MyEventListener # If your listener has config then pass tuple listener = {MyEventListener, config} -EventBus.mark_as_completed({listener, :bye_received, id}) +EventBus.mark_as_completed({listener, {:bye_received, id}}) > :ok ``` @@ -246,7 +246,7 @@ EventBus.mark_as_completed({listener, :bye_received, id}) listener = MyEventListener # If your listener has config then pass tuple listener = {MyEventListener, config} -EventBus.mark_as_skipped({listener, :bye_received, id}) +EventBus.mark_as_skipped({listener, {:bye_received, id}}) > :ok ``` @@ -362,27 +362,37 @@ defmodule MyEventListener do # if your listener does not have a config - def handle_cast({:bye_received, id}, state) do - event = EventBus.fetch_event({:bye_received, id}) + def handle_cast({:bye_received, id} = event_shadow, state) do + event = EventBus.fetch_event(event_shadow) # do sth with event # update the watcher! + # version >= 1.4.0 + EventBus.mark_as_completed({__MODULE__, event_shadow}) + # all versions EventBus.mark_as_completed({__MODULE__, :bye_received, id}) ... {:noreply, state} end - def handle_cast({:hello_received, id}, state) do + def handle_cast({:hello_received, id} = event_shadow, state) do event = EventBus.fetch_event({:hello_received, id}) # do sth with EventBus.Model.Event # update the watcher! + # version >= 1.4.0 + EventBus.mark_as_completed({__MODULE__, event_shadow}) + # all versions EventBus.mark_as_completed({__MODULE__, :hello_received, id}) ... {:noreply, state} end - def handle_cast({topic, id}, state) do + def handle_cast({topic, id} = event_shadow, state) do + # version >= 1.4.0 + EventBus.mark_as_skipped({__MODULE__, event_shadow}) + + # all versions EventBus.mark_as_skipped({__MODULE__, topic, id}) {:noreply, state} end @@ -464,7 +474,7 @@ defmodule MyDataStore do event = EventBus.fetch_event({topic, id}) # write your logic to save event_data to a persistant store - EventBus.mark_as_completed({__MODULE__, topic, id}) + EventBus.mark_as_completed({__MODULE__, {topic, id}}) {:noreply, state} end end diff --git a/lib/event_bus.ex b/lib/event_bus.ex index 1efb411..661b304 100644 --- a/lib/event_bus.ex +++ b/lib/event_bus.ex @@ -14,6 +14,55 @@ defmodule EventBus do alias EventBus.Model.Event + @typedoc "EventBus.Model.Event struct" + @type event :: Event.t() + + @typedoc "Event id" + @type event_id :: String.t() | integer() + + @typedoc "Tuple of topic name and event id" + @type event_shadow :: {topic(), event_id()} + + @typedoc "Event listener/subscriber/consumer" + @type listener :: {listener_without_config() | listener_with_config()} + + @typedoc "Listener configuration" + @type listener_config :: any() + + @typedoc "List of event listeners/subscribers/consumers" + @type listener_list :: list(listener()) + + @typedoc "Event listener/subscriber/consumer with config" + @type listener_with_config :: {module(), listener_config()} + + @typedoc "Tuple of listener and event reference" + @type listener_with_event_ref :: + listener_with_event_shadow() | listener_with_topic_and_event_id() + + @typedoc "Tuple of listener and event shadow" + @type listener_with_event_shadow :: {listener(), event_shadow()} + + @typedoc "Tuple of listener, topic and event id" + @type listener_with_topic_and_event_id :: {listener(), topic(), event_id()} + + @typedoc "Tuple of listener and list of topic patterns" + @type listener_with_topic_patterns :: {listener(), topic_pattern_list()} + + @typedoc "Event listener/subscriber/consumer without config" + @type listener_without_config :: module() + + @typedoc "Topic name" + @type topic :: atom() + + @typedoc "List of topic names" + @type topic_list :: list(topic()) + + @typedoc "Regex pattern to match topic name" + @type topic_pattern :: String.t() + + @typedoc "List of topic patterns" + @type topic_pattern_list :: list(topic_pattern()) + @doc """ Send event to all subscribers(listeners). @@ -25,8 +74,8 @@ defmodule EventBus do :ok """ - @spec notify(Event.t()) :: :ok - defdelegate notify(topic), + @spec notify(event()) :: :ok + defdelegate notify(event), to: Notification, as: :notify @@ -39,7 +88,7 @@ defmodule EventBus do true """ - @spec topic_exist?(String.t() | atom()) :: boolean() + @spec topic_exist?(topic()) :: boolean() defdelegate topic_exist?(topic), to: Topic, as: :exist? @@ -52,7 +101,7 @@ defmodule EventBus do EventBus.topics() [:metrics_summed] """ - @spec topics() :: list(atom()) + @spec topics() :: topic_list() defdelegate topics, to: Topic, as: :all @@ -66,7 +115,7 @@ defmodule EventBus do :ok """ - @spec register_topic(String.t() | atom()) :: :ok + @spec register_topic(topic()) :: :ok defdelegate register_topic(topic), to: Topic, as: :register @@ -80,7 +129,7 @@ defmodule EventBus do :ok """ - @spec unregister_topic(String.t() | atom()) :: :ok + @spec unregister_topic(topic()) :: :ok defdelegate unregister_topic(topic), to: Topic, as: :unregister @@ -99,8 +148,8 @@ defmodule EventBus do :ok """ - @spec subscribe(tuple()) :: :ok - defdelegate subscribe(listener_with_topics), + @spec subscribe(listener_with_topic_patterns()) :: :ok + defdelegate subscribe(listener_with_topic_patterns), to: Subscription, as: :subscribe @@ -114,17 +163,17 @@ defmodule EventBus do # For configurable listeners you must pass tuple of listener and config my_config = %{} - EventBus.unsubscribe({{OtherListener, my_config}}) + EventBus.unsubscribe({OtherListener, my_config}) :ok """ - @spec unsubscribe({tuple() | module()}) :: :ok + @spec unsubscribe(listener()) :: :ok defdelegate unsubscribe(listener), to: Subscription, as: :unsubscribe @doc """ - Is given listener subscribed to the bus for the given topics? + Is given listener subscribed to the bus for the given topic patterns? ## Examples @@ -141,8 +190,8 @@ defmodule EventBus do false """ - @spec subscribed?(tuple()) :: boolean() - defdelegate subscribed?(listener_with_topics), + @spec subscribed?(listener_with_topic_patterns()) :: boolean() + defdelegate subscribed?(listener_with_topic_patterns), to: Subscription, as: :subscribed? @@ -159,7 +208,7 @@ defmodule EventBus do [MyEventListener, {OtherListener, %{}}] """ - @spec subscribers() :: list(any()) + @spec subscribers() :: listener_list() defdelegate subscribers, to: Subscription, as: :subscribers @@ -190,7 +239,7 @@ defmodule EventBus do %EventBus.Model.Model{} """ - @spec fetch_event({atom(), String.t() | integer()}) :: Event.t() + @spec fetch_event(event_shadow()) :: event() defdelegate fetch_event(event_shadow), to: Store, as: :fetch @@ -203,7 +252,7 @@ defmodule EventBus do EventBus.fetch_event_data({:hello_received, "123"}) """ - @spec fetch_event_data({atom(), String.t() | integer()}) :: any() + @spec fetch_event_data(event_shadow()) :: any() defdelegate fetch_event_data(event_shadow), to: Store, as: :fetch_data @@ -212,19 +261,23 @@ defmodule EventBus do Send the event processing completed to the Observation Manager ## Examples + topic = :hello_received + event_id = "124" + event_shadow = {topic, event_id} - EventBus.mark_as_completed({MyEventListener, :hello_received, "123"}) + # For regular listeners + EventBus.mark_as_completed({MyEventListener, event_shadow}) # For configurable listeners you must pass tuple of listener and config my_config = %{} - listener = {OtherListener, my_config} - EventBus.mark_as_completed({listener, :hello_received, "124"}) + listener = {OtherListener, my_config} + + EventBus.mark_as_completed({listener, event_shadow}) :ok """ - @spec mark_as_completed({tuple() | module(), atom(), String.t() | integer()}) :: - no_return() - defdelegate mark_as_completed(listener_with_event_shadow), + @spec mark_as_completed(listener_with_event_ref()) :: no_return() + defdelegate mark_as_completed(listener_with_event_ref), to: Observation, as: :mark_as_completed @@ -233,18 +286,17 @@ defmodule EventBus do ## Examples - EventBus.mark_as_skipped({MyEventListener, :unmatched_occurred, "124"}) + EventBus.mark_as_skipped({MyEventListener, {:unmatched_occurred, "124"}}) # For configurable listeners you must pass tuple of listener and config my_config = %{} - listener = {OtherListener, my_config} - EventBus.mark_as_skipped({listener, :unmatched_occurred, "124"}) + listener = {OtherListener, my_config} + EventBus.mark_as_skipped({listener, {:unmatched_occurred, "124"}}) :ok """ - @spec mark_as_skipped({tuple() | module(), atom(), String.t() | integer()}) :: - no_return() - defdelegate mark_as_skipped(listener_with_event_shadow), + @spec mark_as_skipped(listener_with_event_ref()) :: no_return() + defdelegate mark_as_skipped(listener_with_event_ref), to: Observation, as: :mark_as_skipped end diff --git a/lib/event_bus/managers/observation.ex b/lib/event_bus/managers/observation.ex index 278dd45..220a7a6 100644 --- a/lib/event_bus/managers/observation.ex +++ b/lib/event_bus/managers/observation.ex @@ -55,7 +55,11 @@ defmodule EventBus.Manager.Observation do """ @spec mark_as_completed(tuple()) :: no_return() def mark_as_completed({listener, topic, id}) do - GenServer.cast(__MODULE__, {:mark_as_completed, {listener, topic, id}}) + GenServer.cast(__MODULE__, {:mark_as_completed, {listener, {topic, id}}}) + end + + def mark_as_completed({listener, {topic, id}}) do + GenServer.cast(__MODULE__, {:mark_as_completed, {listener, {topic, id}}}) end @doc """ @@ -63,14 +67,18 @@ defmodule EventBus.Manager.Observation do """ @spec mark_as_skipped(tuple()) :: no_return() def mark_as_skipped({listener, topic, id}) do - GenServer.cast(__MODULE__, {:mark_as_skipped, {listener, topic, id}}) + GenServer.cast(__MODULE__, {:mark_as_skipped, {listener, {topic, id}}}) + end + + def mark_as_skipped({listener, {topic, id}}) do + GenServer.cast(__MODULE__, {:mark_as_skipped, {listener, {topic, id}}}) end @doc """ Create an watcher """ @spec create(tuple()) :: no_return() - def create({listeners, topic, id}) do + def create({listeners, {topic, id}}) do GenServer.call(__MODULE__, {:save, {topic, id}, {listeners, [], []}}) end @@ -122,15 +130,15 @@ defmodule EventBus.Manager.Observation do @doc false @spec handle_cast({:mark_as_completed, tuple()}, term()) :: no_return() - def handle_cast({:mark_as_completed, {listener, topic, id}}, state) do - @backend.mark_as_completed({listener, topic, id}) + def handle_cast({:mark_as_completed, {listener, {topic, id}}}, state) do + @backend.mark_as_completed({listener, {topic, id}}) {:noreply, state} end @doc false @spec handle_cast({:mark_as_skipped, tuple()}, term()) :: no_return() - def handle_cast({:mark_as_skipped, {listener, topic, id}}, state) do - @backend.mark_as_skipped({listener, topic, id}) + def handle_cast({:mark_as_skipped, {listener, {topic, id}}}, state) do + @backend.mark_as_skipped({listener, {topic, id}}) {:noreply, state} end end diff --git a/lib/event_bus/managers/subscription.ex b/lib/event_bus/managers/subscription.ex index 130def4..efeea90 100644 --- a/lib/event_bus/managers/subscription.ex +++ b/lib/event_bus/managers/subscription.ex @@ -22,19 +22,19 @@ defmodule EventBus.Manager.Subscription do end @doc """ - Subscribe the listener to topics + Does the listener subscribe to topic_patterns? """ - @spec subscribed?({tuple() | module(), list()}) :: no_return() - def subscribed?({_listener, _topics} = subscriber) do + @spec subscribed?({tuple() | module(), list()}) :: boolean() + def subscribed?({_listener, _topic_patterns} = subscriber) do GenServer.call(__MODULE__, {:subscribed?, subscriber}) end @doc """ - Subscribe the listener to topics + Subscribe the listener to topic_patterns """ @spec subscribe({tuple() | module(), list()}) :: no_return() - def subscribe({listener, topics}) do - GenServer.cast(__MODULE__, {:subscribe, {listener, topics}}) + def subscribe({listener, topic_patterns}) do + GenServer.cast(__MODULE__, {:subscribe, {listener, topic_patterns}}) end @doc """ @@ -94,8 +94,8 @@ defmodule EventBus.Manager.Subscription do @doc false @spec handle_cast({:subscribe, tuple()}, term()) :: no_return() - def handle_cast({:subscribe, {listener, topics}}, state) do - @backend.subscribe({listener, topics}) + def handle_cast({:subscribe, {listener, topic_patterns}}, state) do + @backend.subscribe({listener, topic_patterns}) {:noreply, state} end diff --git a/lib/event_bus/managers/topic.ex b/lib/event_bus/managers/topic.ex index a6d8d7f..0731909 100644 --- a/lib/event_bus/managers/topic.ex +++ b/lib/event_bus/managers/topic.ex @@ -26,7 +26,7 @@ defmodule EventBus.Manager.Topic do It's important to keep this in blocking manner to prevent double creations in sub modules """ - @spec exist?(String.t() | atom()) :: boolean() + @spec exist?(atom()) :: boolean() def exist?(topic) do GenServer.call(__MODULE__, {:exist?, topic}) end @@ -34,7 +34,7 @@ defmodule EventBus.Manager.Topic do @doc """ Register a topic """ - @spec register(String.t() | atom()) :: :ok + @spec register(atom()) :: :ok def register(topic) do GenServer.call(__MODULE__, {:register, topic}) end @@ -42,7 +42,7 @@ defmodule EventBus.Manager.Topic do @doc """ Unregister a topic """ - @spec unregister(String.t() | atom()) :: :ok + @spec unregister(atom()) :: :ok def unregister(topic) do GenServer.call(__MODULE__, {:unregister, topic}) end @@ -72,25 +72,24 @@ defmodule EventBus.Manager.Topic do ########################################################################### @doc false - @spec handle_call({:exist?, String.t() | atom()}, any(), term()) + @spec handle_call({:exist?, atom()}, any(), term()) :: {:reply, boolean(), term()} def handle_call({:exist?, topic}, _from, state) do - {:reply, @backend.exist?(:"#{topic}"), state} + {:reply, @backend.exist?(topic), state} end @doc false - @spec handle_call({:register, String.t() | atom()}, any(), term()) - :: {:reply, :ok, term()} + @spec handle_call({:register, atom()}, any(), term()) :: {:reply, :ok, term()} def handle_call({:register, topic}, _from, state) do - @backend.register(:"#{topic}") + @backend.register(topic) {:reply, :ok, state} end @doc false - @spec handle_call({:unregister, String.t() | atom()}, any(), term()) + @spec handle_call({:unregister, atom()}, any(), term()) :: {:reply, :ok, term()} def handle_call({:unregister, topic}, _from, state) do - @backend.unregister(:"#{topic}") + @backend.unregister(topic) {:reply, :ok, state} end end diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index a663a78..0bd828e 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -17,7 +17,7 @@ defmodule EventBus.Service.Notification do warn_missing_topic_subscription(topic) else :ok = StoreManager.create(event) - :ok = ObservationManager.create({listeners, topic, id}) + :ok = ObservationManager.create({listeners, {topic, id}}) notify_listeners(listeners, {topic, id}) end @@ -34,7 +34,7 @@ defmodule EventBus.Service.Notification do rescue error -> log_error(listener, error) - ObservationManager.mark_as_skipped({{listener, config}, topic, id}) + ObservationManager.mark_as_skipped({{listener, config}, {topic, id}}) end @spec notify_listener(module(), tuple()) :: no_return() @@ -43,7 +43,7 @@ defmodule EventBus.Service.Notification do rescue error -> log_error(listener, error) - ObservationManager.mark_as_skipped({listener, topic, id}) + ObservationManager.mark_as_skipped({listener, {topic, id}}) end @spec registration_status(atom()) :: String.t() diff --git a/lib/event_bus/services/observation.ex b/lib/event_bus/services/observation.ex index 8e7c8e7..d88919e 100644 --- a/lib/event_bus/services/observation.ex +++ b/lib/event_bus/services/observation.ex @@ -35,16 +35,16 @@ defmodule EventBus.Service.Observation do @doc false @spec mark_as_completed(tuple()) :: no_return() - def mark_as_completed({listener, topic, id}) do - {listeners, completers, skippers} = fetch({topic, id}) - save_or_delete({topic, id}, {listeners, [listener | completers], skippers}) + def mark_as_completed({listener, event_shadow}) do + {listeners, completers, skippers} = fetch(event_shadow) + save_or_delete(event_shadow, {listeners, [listener | completers], skippers}) end @doc false @spec mark_as_skipped(tuple()) :: no_return() - def mark_as_skipped({listener, topic, id}) do - {listeners, completers, skippers} = fetch({topic, id}) - save_or_delete({topic, id}, {listeners, completers, [listener | skippers]}) + def mark_as_skipped({listener, event_shadow}) do + {listeners, completers, skippers} = fetch(event_shadow) + save_or_delete(event_shadow, {listeners, completers, [listener | skippers]}) end @doc false diff --git a/test/event_bus/managers/observation_test.exs b/test/event_bus/managers/observation_test.exs index e98f984..4dab3c3 100644 --- a/test/event_bus/managers/observation_test.exs +++ b/test/event_bus/managers/observation_test.exs @@ -46,7 +46,7 @@ defmodule EventBus.Manager.ObservationTest do Observation.register_topic(topic) - assert :ok == Observation.create({listeners, topic, id}) + assert :ok == Observation.create({listeners, {topic, id}}) end test "complete" do @@ -61,11 +61,16 @@ defmodule EventBus.Manager.ObservationTest do ] Observation.register_topic(topic) - Observation.create({listeners, topic, id}) + Observation.create({listeners, {topic, id}}) listener = {InputLogger, %{}} + another_listener = {Calculator, %{}} - assert :ok === Observation.mark_as_completed({listener, topic, id}) + # with event_shadow tuple + assert :ok === Observation.mark_as_completed({listener, {topic, id}}) + + # with open tuple + assert :ok === Observation.mark_as_completed({another_listener, topic, id}) end test "skip" do @@ -80,7 +85,15 @@ defmodule EventBus.Manager.ObservationTest do ] Observation.register_topic(topic) - Observation.create({listeners, topic, id}) - assert :ok == Observation.mark_as_skipped({{InputLogger, %{}}, topic, id}) + Observation.create({listeners, {topic, id}}) + + listener = {InputLogger, %{}} + another_listener = {Calculator, %{}} + + # with event_shadow tuple + assert :ok == Observation.mark_as_skipped({listener, {topic, id}}) + + # with open tuple + assert :ok == Observation.mark_as_skipped({another_listener, topic, id}) end end diff --git a/test/event_bus/services/observation_test.exs b/test/event_bus/services/observation_test.exs index 2a9da47..0cbfb00 100644 --- a/test/event_bus/services/observation_test.exs +++ b/test/event_bus/services/observation_test.exs @@ -74,7 +74,7 @@ defmodule EventBus.Service.ObservationTest do Observation.register_topic(topic) Observation.save({topic, id}, {listeners, [], []}) - Observation.mark_as_completed({{InputLogger, %{}}, topic, id}) + Observation.mark_as_completed({{InputLogger, %{}}, {topic, id}}) assert {listeners, [{InputLogger, %{}}], []} == Observation.fetch({topic, id}) end @@ -92,7 +92,7 @@ defmodule EventBus.Service.ObservationTest do Observation.register_topic(topic) Observation.save({topic, id}, {listeners, [], []}) - Observation.mark_as_skipped({{InputLogger, %{}}, topic, id}) + Observation.mark_as_skipped({{InputLogger, %{}}, {topic, id}}) assert {listeners, [], [{InputLogger, %{}}]} == Observation.fetch({topic, id}) end