Skip to content

Commit

Permalink
Merge pull request #17 from otobus/sys_events
Browse files Browse the repository at this point in the history
Add optional system events
  • Loading branch information
Mustafa TURAN authored Feb 18, 2018
2 parents 6bbf0cd + 17b66bc commit 5dc63ee
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 29 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
language: elixir
elixir:
- 1.5
- 1.6
otp_release:
- 20.0
14 changes: 10 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.0.0]
## Current

### Added

- Optional system events which notify the `eb_action_called` topic for the actions: `notify`, `register_topic`, `unregister_topic`, `subscribe`, `unsubscribe`, `mark_as_completed`, `mark_as_skipped`

## [1.0.0] - 2018.01.23

### Added

Expand All @@ -17,7 +23,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Error topic introduced for dynamic event builder/notifier with `EventSource`. Now you can pass `:error_topic` key, EvetSource automatically check the result of execution block for `{:error, _}` tuple and create an event structure for the given `:error_topic`.
- Add elixir formatter config to format code

## [0.9.0] - 2018-01-06
## [0.9.0] - 2018.01.06

### Added

Expand All @@ -26,7 +32,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### TODO

## [0.8.0] - 2018-01-06
## [0.8.0] - 2018.01.06

### Added

Expand All @@ -35,7 +41,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Add block/yield notifier for delivering/notifying events creation with same benefits of build block
- Add changelog file

## [0.7.0] - 2018-01-06
## [0.7.0] - 2018.01.06

### Added

Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Traceable, extendable and minimalist event bus implementation for Elixir with bu

[Traceability](#traceability)

- [System Events](#system-events)

[Documentation](#documentation)

[Addons](#addons)
Expand Down Expand Up @@ -432,6 +434,22 @@ end

EventBus comes with a good enough data structure to track the event life cycle with its optional parameters. For a traceable system, it is highly recommend to fill optional fields on event data. It is also encouraged to use `Event.nofify` block/yield to automatically set the `initialized_at` and `occurred_at` values.

EvenBus version > 1.1 comes with optional system events which allows to track its action calls.

### System Events

EventBus optionally allows you to track `:register_topic`, `:unregister_topic`, `:subscribe` and `:unsubscribe`, `:notify`, `:mark_as_completed` and `mark_as_skipped` action calls by the configuration. To track these events you need to enable them by configuration and then subscribe to `:eb_action_called` topic.

Note: Enabling optional system events decreases the EventBus performance because it at least doubles the operation calls. It is not recommended to enable these events unless you certainly require to track these events espcially `notify`, `mark_as_completed` and `mark_as_skipped`.

Enabling observable events only can be done on compile time. Thus, you need to add it to your app configuration. Here is sample configuration to subscribe optional system events:

```elixir
config :event_bus,
observables: ~w(register_topic unregister_topic subscribe unsubscribe notify mark_as_completed mark_as_skipped)a,
...
```

## Documentation

Module docs can be found at [https://hexdocs.pm/event_bus](https://hexdocs.pm/event_bus).
Expand Down
13 changes: 12 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
use Mix.Config

config :event_bus, topics: [:metrics_received, :metrics_summed]
config :event_bus,
topics: [:metrics_received, :metrics_summed],
observables: [
:notify,
:register_topic,
:unregister_topic,
:subscribe,
:unsubscribe,
:mark_as_completed,
:mark_as_skipped
],
id_generator: fn -> :base64.encode(:crypto.strong_rand_bytes(8)) end
3 changes: 2 additions & 1 deletion coveralls.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"skip_files": [
"test/support"
"test/support",
"lib/event_bus.ex"
],
"custom_stop_words": [
"defdelegate"
Expand Down
118 changes: 106 additions & 12 deletions lib/event_bus.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
defmodule EventBus do
@moduledoc """
Traceable, extendable and minimalist event bus implementation for Elixir with
built-in event store and event watcher based on ETS
"""
@moduledoc false

use EventBus.EventSource
alias EventBus.{Notifier, Store, Watcher, Subscription, Topic}

@app :event_bus
@source "eb"
@sys_topic :eb_action_called
@observables Application.get_env(@app, :observables, [])

defmacrop is_observable(action) do
quote do
unquote(action) in unquote(@observables)
end
end

alias EventBus.{Notifier, Store, Watcher, Subscription, Topic, Model.Event}
defmacrop is_observable(action, topic) do
quote do
unquote(action) in unquote(@observables) and
unquote(topic) != unquote(@sys_topic)
end
end

@doc """
Send event to all subscribers(listeners).
Expand All @@ -18,7 +34,17 @@ defmodule EventBus do
"""
@spec notify(Event.t()) :: :ok
defdelegate notify(event),
def notify(%Event{id: id, topic: topic} = event)
when is_observable(:notify, topic) do
EventSource.notify sys_params() do
Notifier.notify(event)
%{action: :notify, id: id, subscribers: subscribers(topic), topic: topic}
end

:ok
end

defdelegate notify(topic),
to: Notifier,
as: :notify

Expand Down Expand Up @@ -58,7 +84,18 @@ defmodule EventBus do
:ok
"""
@spec register_topic(String.t() | atom()) :: boolean()
@spec register_topic(String.t() | atom()) :: :ok
def register_topic(topic) when is_observable(:register_topic, topic) do
unless topic_exist?(topic) do
EventSource.notify sys_params() do
Topic.register(topic)
%{action: :register_topic, topic: topic}
end
end

:ok
end

defdelegate register_topic(topic),
to: Topic,
as: :register
Expand All @@ -72,7 +109,18 @@ defmodule EventBus do
:ok
"""
@spec unregister_topic(String.t() | atom()) :: boolean()
@spec unregister_topic(String.t() | atom()) :: :ok
def unregister_topic(topic) when is_observable(:unregister_topic, topic) do
if topic_exist?(topic) do
EventSource.notify sys_params() do
Topic.unregister(topic)
%{action: :unregister_topic, topic: topic}
end
end

:ok
end

defdelegate unregister_topic(topic),
to: Topic,
as: :unregister
Expand All @@ -92,6 +140,17 @@ defmodule EventBus do
"""
@spec subscribe(tuple()) :: :ok
def subscribe({listener, topics}) when is_observable(:subscribe) do
unless Enum.member?(subscribers(), {listener, topics}) do
EventSource.notify sys_params() do
Subscription.subscribe({listener, topics})
%{action: :subscribe, listener: listener, topics: topics}
end
end

:ok
end

defdelegate subscribe(listener_with_topics),
to: Subscription,
as: :subscribe
Expand All @@ -110,7 +169,18 @@ defmodule EventBus do
:ok
"""
@spec unsubscribe({tuple() | module()}) :: :ok
@spec unsubscribe(tuple()) :: :ok
def unsubscribe({listener}) when is_observable(:unsubscribe) do
if Enum.member?(subscribers(), {listener}) do
EventSource.notify sys_params() do
Subscription.unsubscribe({listener})
%{action: :unsubscribe, listener: listener}
end
end

:ok
end

defdelegate unsubscribe(listener),
to: Subscription,
as: :unsubscribe
Expand Down Expand Up @@ -146,7 +216,6 @@ defmodule EventBus do
[MyEventListener, {OtherListener, %{}}]
"""
@spec subscribers(atom() | String.t()) :: list(any())
defdelegate subscribers(topic),
to: Subscription,
as: :subscribers
Expand Down Expand Up @@ -179,7 +248,17 @@ defmodule EventBus do
"""
@spec mark_as_completed({tuple() | module(), atom(), String.t() | integer()})
:: no_return()
:: no_return()
def mark_as_completed({listener, topic, id})
when is_observable(:mark_as_completed, topic) do
EventSource.notify sys_params() do
Watcher.mark_as_completed({listener, topic, id})
%{action: :mark_as_completed, id: id, listener: listener, topic: topic}
end

:ok
end

defdelegate mark_as_completed(listener_with_event_shadow),
to: Watcher,
as: :mark_as_completed
Expand All @@ -199,8 +278,23 @@ defmodule EventBus do
"""
@spec mark_as_skipped({tuple() | module(), atom(), String.t() | integer()})
:: no_return()
:: no_return()
def mark_as_skipped({listener, topic, id})
when is_observable(:mark_as_skipped, topic) do
EventSource.notify sys_params() do
Watcher.mark_as_skipped({listener, topic, id})
%{action: :mark_as_skipped, id: id, listener: listener, topic: topic}
end

:ok
end

defdelegate mark_as_skipped(listener_with_event_shadow),
to: Watcher,
as: :mark_as_skipped

defp sys_params do
id = Application.get_env(@app, :id_generator).()
%{id: id, transaction_id: id, topic: @sys_topic, source: @source}
end
end
9 changes: 8 additions & 1 deletion lib/event_bus/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule EventBus.Application do
use Application
alias EventBus.{Notifier, Store, Watcher, Subscription, Topic}

@sys_topic :eb_action_called

def start(_type, _args) do
import Supervisor.Spec, warn: false

Expand All @@ -17,7 +19,12 @@ defmodule EventBus.Application do

opts = [strategy: :one_for_one, name: EventBus.Supervisor]
link = Supervisor.start_link(children, opts)
Topic.register_from_config()
register_topics()
link
end

defp register_topics do
Topic.register(@sys_topic)
Topic.register_from_config()
end
end
2 changes: 1 addition & 1 deletion lib/event_bus/services/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule EventBus.Service.Topic do
@doc false
@spec exist?(String.t() | atom()) :: boolean()
def exist?(topic),
do: Enum.any?(all(), fn event_topic -> event_topic == :"#{topic}" end)
do: Enum.member?(all(), :"#{topic}")

@doc false
@spec register_from_config() :: no_return()
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule EventBus.Mixfile do
defp deps do
[
{:credo, "~> 0.8.10", only: [:dev]},
{:dialyxir, "~> 0.5.1", only: [:dev], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
{:excoveralls, "~> 0.8", only: [:test]},
{:ex_doc, "~> 0.18.1", only: [:dev]}
]
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"credo": {:hex, :credo, "0.8.10", "261862bb7363247762e1063713bb85df2bbd84af8d8610d1272cd9c1943bba63", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.0", "99d2691d3edf8612f128be3f9869c4d44b91c67cec92186ce49470ae7a7404cf", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.1", "0bbf67f22c7dbf7503981d21a5eef5db8bbc3cb86e70d3798e8c802c74fa5e27", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
"hackney": {:hex, :hackney, "1.10.1", "c38d0ca52ea80254936a32c45bb7eb414e7a96a521b4ce76d00a69753b157f21", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"hackney": {:hex, :hackney, "1.11.0", "4951ee019df102492dabba66a09e305f61919a8a183a7860236c0fde586134b6", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
Expand Down
8 changes: 7 additions & 1 deletion test/event_bus/services/notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule EventBus.Service.NotifierTest do
Calculator,
AnotherCalculator,
MemoryLeakerOne,
BadOne
BadOne,
AnotherBadOne
}

doctest Notifier
Expand Down Expand Up @@ -49,6 +50,7 @@ defmodule EventBus.Service.NotifierTest do
)

Subscription.subscribe({{BadOne, %{}}, [".*"]})
Subscription.subscribe({AnotherBadOne, [".*"]})
Subscription.subscribe({{Calculator, %{}}, ["metrics_received$"]})
Subscription.subscribe({{MemoryLeakerOne, %{}}, [".*"]})

Expand All @@ -66,6 +68,10 @@ defmodule EventBus.Service.NotifierTest do

assert String.contains?(logs, "BadOne.process/1 raised an error!")

assert String.contains?(logs, "AnotherBadOne.process/1 raised an error!")

assert String.contains?(logs, "I don't want to handle your event")

assert String.contains?(
logs,
"Event log for %EventBus.Model.Event{data: [1, 2], id: \"E1\", initialized_at: nil, occurred_at: nil, source: \"NotifierTest\", topic: :metrics_received, transaction_id: \"T1\", ttl: nil}"
Expand Down
3 changes: 2 additions & 1 deletion test/event_bus/services/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ defmodule EventBus.Service.SubscriptionTest do
%{
metrics_received: [AnotherCalculator, {InputLogger, %{}}],
metrics_summed: [{InputLogger, %{}}],
auto_subscribed: []
auto_subscribed: [],
eb_action_called: []
}
}

Expand Down
4 changes: 3 additions & 1 deletion test/event_bus/services/topic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule EventBus.Service.TopicTest do
use ExUnit.Case, async: false
alias EventBus.Service.Topic

@sys_topic :eb_action_called

doctest Topic

setup do
Expand Down Expand Up @@ -53,7 +55,7 @@ defmodule EventBus.Service.TopicTest do
topic = :t3
Topic.register(topic)
Process.sleep(10)
assert [:t3, :metrics_received, :metrics_summed] == Topic.all()
assert [:t3, @sys_topic, :metrics_received, :metrics_summed] == Topic.all()
end

test "exist? with an existent topic" do
Expand Down
Loading

0 comments on commit 5dc63ee

Please sign in to comment.