diff --git a/config/config.exs b/config/config.exs index a7ff884..bbb3935 100644 --- a/config/config.exs +++ b/config/config.exs @@ -5,6 +5,8 @@ # is restricted to this project. import Config +config :turn_junebug_expressway, :agent, ttl: System.get_env("RECIPIENT_ID_TTL", "10000") + # Configures the endpoint config :turn_junebug_expressway, TurnJunebugExpresswayWeb.Endpoint, url: [host: "localhost"], diff --git a/lib/turn_junebug_expressway/application.ex b/lib/turn_junebug_expressway/application.ex index 8edb90b..b9f80c3 100644 --- a/lib/turn_junebug_expressway/application.ex +++ b/lib/turn_junebug_expressway/application.ex @@ -11,6 +11,8 @@ defmodule TurnJunebugExpressway.Application do # Define workers and child supervisors to be supervised children = [ + # Start Agent + {TurnJunebugExpressway.MessageRecipientIdCache, name: :my_cache}, # Start the endpoint when the application starts TurnJunebugExpresswayWeb.Endpoint, # Start your own worker by calling: TurnJunebugExpressway.Worker.start_link(arg1, arg2, arg3) diff --git a/lib/turn_junebug_expressway/message_recipient_id_cache.ex b/lib/turn_junebug_expressway/message_recipient_id_cache.ex new file mode 100644 index 0000000..fabda7e --- /dev/null +++ b/lib/turn_junebug_expressway/message_recipient_id_cache.ex @@ -0,0 +1,46 @@ +defmodule TurnJunebugExpressway.MessageRecipientIdCache do + use GenServer + + def start_link(options \\ []) do + {name, options} = Keyword.pop(options, :name, __MODULE__) + GenServer.start_link(__MODULE__, options, name: name) + end + + def put( + pid, + key, + value, + ttl \\ get_env(:agent, :ttl) |> String.to_integer() + ) do + GenServer.call(pid, {:put, key, value, ttl}) + end + + def get(pid, key) do + GenServer.call(pid, {:get, key}) + end + + # GenServer callbacks + + def init(_) do + state = %{} + {:ok, state} + end + + # Setting the same key multiple times, does not extend its lifetime---each call schedules a new expiry + def handle_call({:put, key, value, ttl}, _from, state) do + Process.send_after(self(), {:expire, key}, ttl) + {:reply, :ok, Map.put(state, key, value)} + end + + def handle_call({:get, key}, _from, state) do + {:reply, Map.get(state, key), state} + end + + def handle_info({:expire, key}, state) do + {:noreply, Map.delete(state, key)} + end + + def get_env(section, key) do + Application.get_env(:turn_junebug_expressway, section)[key] + end +end diff --git a/lib/turn_junebug_expressway_web/utils.ex b/lib/turn_junebug_expressway_web/utils.ex index 94c8f1c..dca22a4 100644 --- a/lib/turn_junebug_expressway_web/utils.ex +++ b/lib/turn_junebug_expressway_web/utils.ex @@ -1,6 +1,6 @@ defmodule TurnJunebugExpresswayWeb.Utils do use Tesla - + alias TurnJunebugExpressway.MessageRecipientIdCache @turn_client Application.compile_env(:turn_junebug_expressway, :turn_client) @rapidpro_client Application.compile_env(:turn_junebug_expressway, :rapidpro_client) @@ -61,7 +61,21 @@ defmodule TurnJunebugExpresswayWeb.Utils do :ok end + def send_message(message, ttl) do + key = Map.get(message, "user_message_id") + value = Map.get(message, "recipient_id") + # IO.puts("#{message}") + # IO.puts("#{inspect(key)}, #{inspect(value)}") + TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value, ttl) + TurnJunebugExpressway.MessageEngine.publish_message(message) + end + def send_message(message) do + key = Map.get(message, "user_message_id") + value = Map.get(message, "recipient_id") + # IO.puts("#{message}") + # IO.puts("#{inspect(key)}, #{inspect(value)}") + TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value) TurnJunebugExpressway.MessageEngine.publish_message(message) end @@ -113,24 +127,32 @@ defmodule TurnJunebugExpresswayWeb.Utils do end def forward_event(event) do - IO.puts("FORWARD_EVENT: #{inspect(event)}") + # IO.puts("#{inspect(event)}") + IO.puts( + "#{inspect(MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")))}" + ) case event |> get_event_status do {:ignore, _} -> :ok {:ok, status} -> - @turn_client.client() - |> @turn_client.post_event(%{ - "statuses" => [ - %{ - "id" => Map.get(event, "user_message_id"), - "recipient_id" => nil, - "status" => status, - "timestamp" => get_event_timestamp(event, :second) - } - ] - }) + if recipient_id = + MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")) != nil do + @turn_client.client() + |> @turn_client.post_event(%{ + "statuses" => [ + %{ + "id" => Map.get(event, "user_message_id"), + # "recipient_id" => nil, + "recipient_id" => + MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")), + "status" => status, + "timestamp" => get_event_timestamp(event, :second) + } + ] + }) + end end end diff --git a/test/turn_junebug_expressway_web/utils_test.exs b/test/turn_junebug_expressway_web/utils_test.exs index 82ca5f1..ba791e2 100644 --- a/test/turn_junebug_expressway_web/utils_test.exs +++ b/test/turn_junebug_expressway_web/utils_test.exs @@ -4,6 +4,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do import Mox alias TurnJunebugExpresswayWeb.Utils + alias TurnJunebugExpressway.MessageRecipientIdCache describe "format_urn" do test "format_urn/1 with + for turn" do @@ -34,18 +35,58 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do end describe "handle_incoming_event" do + test "sends event back to turn, recipient_id not found", %{} do + TurnJunebugExpressway.Backends.ClientMock + |> expect(:client, fn -> :client end) + |> expect(:post_event, fn :client, _ -> raise "Shouldnt be called" end) + + message = %{ + "content" => "something", + "recipient_id" => nil, + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3" + } + + Utils.send_message(message) + + event = %{ + "transport_name" => "d49d3569-47d5-47a0-8074-5a7ffa684832", + "event_type" => "ack", + "event_id" => "b3db4f670d4c4e2297c58a6dc5b72980", + "sent_message_id" => "f74c4e6108d8418ab53dbcfd628242f3", + "helper_metadata" => %{}, + "routing_metadata" => %{}, + "message_version" => "20110921", + "timestamp" => "2019-10-31 12:32:24.930687", + "transport_metadata" => %{}, + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3", + "message_type" => "event" + } + + # Utils.handle_incoming_event(Jason.encode!(event)) + + assert Utils.handle_incoming_event(Jason.encode!(event)) == nil + end + test "sends event back to turn", %{} do body = %{ "statuses" => [ %{ "id" => "f74c4e6108d8418ab53dbcfd628242f3", - "recipient_id" => nil, + "recipient_id" => "1234", "status" => "sent", "timestamp" => "1572525144" } ] } + message = %{ + "content" => "something", + "recipient_id" => "1234", + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3" + } + + Utils.send_message(message) + TurnJunebugExpressway.Backends.ClientMock |> expect(:client, fn -> :client end) |> expect(:post_event, fn :client, ^body -> :ok end) @@ -72,13 +113,21 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do "statuses" => [ %{ "id" => "f74c4e6108d8418ab53dbcfd628242f3", - "recipient_id" => nil, + "recipient_id" => 1234, "status" => "sent", "timestamp" => "1572525144930" } ] } + message = %{ + "content" => "something", + "recipient_id" => "1234", + "user_message_id" => "16e42b66-03b7-4558-8a72-e9db481fdb4c" + } + + Utils.send_message(message) + TurnJunebugExpressway.Backends.ClientMock |> expect(:client, fn -> :client end) |> expect(:post_event, fn :client, _new_body -> @@ -163,6 +212,34 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do end end + describe "ttl" do + test "checking if key is deleated after 3 seconds" do + message = %{ + "content" => "something", + "recipient_id" => "1234", + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3" + } + + Utils.send_message(message, 1000) + assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234" + :timer.sleep(3_000) + assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil + end + + test "checking if key is deleated after default ttl" do + message = %{ + "content" => "something", + "recipient_id" => "1234", + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3" + } + + Utils.send_message(message) + assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234" + :timer.sleep(11_000) + assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil + end + end + describe "queue_stuck?" do test "true if rate is 0 and there is messages", %{} do assert Utils.queue_stuck?(0, 1) == true