Skip to content

Commit

Permalink
Merge pull request #63 from otobus/mustafaturan/monotonic_time
Browse files Browse the repository at this point in the history
Enhancements
  • Loading branch information
Mustafa TURAN authored Sep 15, 2018
2 parents 78661fd + 0ba166d commit fab2567
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 35 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.5.X]
- Fix Elixir 1.7 warnings for string to atom conversions
- Remove deprecated EventBus.Util.String module
- Fix Elixir `v1.7.x` warnings for string to atom conversions
- Remove deprecated `EventBus.Util.String` module
- Move the time calculation logic into the new `MonotonicTime` utility module
- Set `initialized_at` value on `EventSource` helper to a monotonically increasing time
- Enhance tests for the `:second` time unit
- Enhance tests for the `unique_id` generator

## [1.4.X] 2018.09.07

Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,15 @@ EventBus has some addons to extend its optional functionalities. One of them is

## Addons

EventBus allows building generic and specific addons for your stack. Here are a few generic addons which I'm currently working on:
A few sample addons listed below. Please do not hesitate to add your own addon to the list.

- `event_bus_logger` allows you to log event bus events to your console with a generic configuration: https://github.com/mustafaturan/event_bus_logger
| Addon Name | Description | Link | Docs |
| -------------------- | ------------- | ------------- | ------------- |
| `event_bus_postgres` | Fast event consumer to persist `event_bus` events to Postgres using GenStage | [Github](https://github.com/otobus/event_bus_postgres) | [HexDocs](https://hexdocs.pm/event_bus_postgres) |
| `event_bus_logger` | Deadly simple log listener implementation | [Github](https://github.com/otobus/event_bus_logger) | [HexDocs](https://hexdocs.pm/event_bus_logger) |
| `event_bus_metrics` | Metrics UI and metrics API endpoints for EventBus events for debugging and monitoring | [Hex](https://hex.pm/packages/event_bus_metrics) | [HexDocs](https://hexdocs.pm/event_bus_metrics) |

- `event_bus_postgres` allows you to save event bus events to Postgres DB with a generic configuration: https://github.com/mustafaturan/event_bus_postgres

- `event_bus_zipkin` allows you to trace event bus events via Zipkin

- `event_bus_ddtrace` allows you to trace event bus events via Datadog APM
Note: The addons under [https://github.com/otobus](https://github.com/otobus) organization implemented as a sample, but feel free to use them in your project with respecting their licenses.

## License

Expand Down
8 changes: 3 additions & 5 deletions lib/event_bus/event_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule EventBus.EventSource do
"""

alias EventBus.Model.Event
alias EventBus.Util.MonotonicTime
alias __MODULE__

defmacro __using__(_) do
Expand All @@ -17,7 +18,6 @@ defmodule EventBus.EventSource do
@eb_app :event_bus
@eb_id_gen Application.get_env(@eb_app, :id_generator, Base62)
@eb_source String.replace("#{__MODULE__}", "Elixir.", "")
@eb_time_unit Application.get_env(@eb_app, :time_unit, :microsecond)
@eb_ttl Application.get_env(@eb_app, :ttl)
end
end
Expand All @@ -28,8 +28,7 @@ defmodule EventBus.EventSource do
"""
defmacro build(params, do: yield) do
quote do
started_at = System.monotonic_time(@eb_time_unit)
initialized_at = System.os_time(@eb_time_unit)
initialized_at = MonotonicTime.now()
params = unquote(params)

{topic, data} =
Expand All @@ -42,15 +41,14 @@ defmodule EventBus.EventSource do
end

id = Map.get(params, :id, @eb_id_gen.unique_id())
time_spent = System.monotonic_time(@eb_time_unit) - started_at

%Event{
id: id,
topic: topic,
transaction_id: Map.get(params, :transaction_id, id),
data: data,
initialized_at: initialized_at,
occurred_at: initialized_at + time_spent,
occurred_at: MonotonicTime.now(),
source: Map.get(params, :source, @eb_source),
ttl: Map.get(params, :ttl, @eb_ttl)
}
Expand Down
38 changes: 38 additions & 0 deletions lib/event_bus/utils/monotonic_time.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule EventBus.Util.MonotonicTime do
@moduledoc false

@eb_app :event_bus
@eb_time_unit Application.get_env(@eb_app, :time_unit, :microsecond)

@doc """
Calculates monotonically increasing current time
"""
@spec now() :: integer()
def now do
init_time() + monotonic_time()
end

defp init_time do
case Application.get_env(@eb_app, :init_time) do
nil ->
time = os_time() - monotonic_time()
save_init_time(time)

time ->
time
end
end

defp save_init_time(time) do
Application.put_env(:event_bus, :init_time, time, persistent: true)
time
end

defp os_time do
System.os_time(@eb_time_unit)
end

defp monotonic_time do
System.monotonic_time(@eb_time_unit)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule EventBus.Mixfile do
# Type "mix help deps" for more examples and options
defp deps do
[
{:credo, "~> 0.10", only: [:dev]},
{:credo, "~> 0.10", only: [:dev, :test]},
{:dialyxir, "~> 1.0.0-rc.3", only: [:dev], runtime: false},
{:excoveralls, "~> 0.10", only: [:test]},
{:ex_doc, "~> 0.19", only: [:dev]}
Expand Down
16 changes: 9 additions & 7 deletions test/event_bus/event_source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ defmodule EventBus.EventSourceTest do
data = %{id: 1, name: "me", email: "[email protected]"}
transaction_id = "t1"
ttl = 100
params = %{
id: id,
topic: topic,
transaction_id: transaction_id,
ttl: ttl,
source: "me"
}

event =
EventSource.build %{
id: id,
topic: topic,
transaction_id: transaction_id,
ttl: ttl,
source: "me"
} do
EventSource.build params do
Process.sleep(1_000)
data
end

Expand Down
8 changes: 4 additions & 4 deletions test/event_bus/managers/observation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ defmodule EventBus.Manager.ObservationTest do
listener = {InputLogger, %{}}
another_listener = {Calculator, %{}}

# with event_shadow tuple
# With an event_shadow tuple
assert :ok === Observation.mark_as_completed({listener, {topic, id}})

# with open tuple
# With an open tuple
assert :ok === Observation.mark_as_completed({another_listener, topic, id})
end

Expand All @@ -90,10 +90,10 @@ defmodule EventBus.Manager.ObservationTest do
listener = {InputLogger, %{}}
another_listener = {Calculator, %{}}

# with event_shadow tuple
# With an event_shadow tuple
assert :ok == Observation.mark_as_skipped({listener, {topic, id}})

# with open tuple
# With an open tuple
assert :ok == Observation.mark_as_skipped({another_listener, topic, id})
end
end
10 changes: 6 additions & 4 deletions test/event_bus/models/event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule EventBus.Model.EventTest do
use ExUnit.Case
require EventBus.Model.Event
alias EventBus.Model.Event
alias EventBus.Util.MonotonicTime

doctest Event

Expand All @@ -11,16 +12,17 @@ defmodule EventBus.Model.EventTest do
end

test "duration" do
initialized_at = System.os_time(:microsecond)
# do sth in this frame
Process.sleep(1)
initialized_at = MonotonicTime.now()
# Do sth in this frame
# For example; sleep 1 second
Process.sleep(1_000)

event = %Event{
id: 1,
topic: "user_created",
data: %{id: 1, name: "me", email: "[email protected]"},
initialized_at: initialized_at,
occurred_at: System.os_time(:microsecond)
occurred_at: MonotonicTime.now()
}

assert Event.duration(event) > 0
Expand Down
16 changes: 13 additions & 3 deletions test/event_bus/utils/base62_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@ defmodule EventBus.Util.Base62Test do

alias EventBus.Util.Base62

test ".encode" do
test "encode" do
assert "0" == Base62.encode(0)
assert "z" == Base62.encode(61)
assert "10" == Base62.encode(62)
assert "1p0uwg6tOzJ" == Base62.encode(1_529_891_323_138_833_953)
end

test ".unique_id" do
test "unique_id" do
refute Base62.unique_id() == Base62.unique_id()
end

test ".unique_id length" do
test "unique_id length must match" do
assert 16 == String.length(Base62.unique_id())
end

test "unique_id does not change node_id configuration on multiple calls" do
# First call
Base62.unique_id()
node_id = Application.get_env(:event_bus, :node_id)

# Second call
Base62.unique_id()
assert node_id == Application.get_env(:event_bus, :node_id)
end
end
23 changes: 23 additions & 0 deletions test/event_bus/utils/monotonic_time_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule EventBus.Util.MonotonicTimeTest do
use ExUnit.Case

alias EventBus.Util.MonotonicTime

test "now should return int" do
assert is_integer(MonotonicTime.now())
end

test "now should increase on every call" do
assert MonotonicTime.now() <= MonotonicTime.now()
end

test "now does not change init_time configuration on multiple calls" do
# First call
MonotonicTime.now()
init_time = Application.get_env(:event_bus, :init_time) # init_time

# Second call
MonotonicTime.now()
assert init_time == Application.get_env(:event_bus, :init_time)
end
end
4 changes: 2 additions & 2 deletions test/event_bus_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ defmodule EventBusTest do
EventBus.subscribe({{BadOne, %{}}, [".*"]})
EventBus.subscribe({{Calculator, %{}}, ["metrics_received"]})
EventBus.subscribe({{MemoryLeakerOne, %{}}, [".*"]})
# Wait until listeners subscribed to
# Wait until the listeners subscribe to the topics
Process.sleep(100)

logs =
capture_log(fn ->
EventBus.notify(@event)
# Wait until listeners process events
# Wait until the listeners process the event
Process.sleep(300)
end)

Expand Down

0 comments on commit fab2567

Please sign in to comment.