Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding agent for caching recipient ids and extra tests for these addi… #35

Merged
merged 12 commits into from
Aug 8, 2024
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# is restricted to this project.
import Config

config :turn_junebug_expressway, :agent, ttl: System.get_env("RECIPIENT_ID_TTL", "10000")

# Configures the endpoint
config :turn_junebug_expressway, TurnJunebugExpresswayWeb.Endpoint,
url: [host: "localhost"],
Expand Down
2 changes: 2 additions & 0 deletions lib/turn_junebug_expressway/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule TurnJunebugExpressway.Application do

# Define workers and child supervisors to be supervised
children = [
# Start Agent
{TurnJunebugExpressway.MessageRecipientIdCache, name: :my_cache},
# Start the endpoint when the application starts
TurnJunebugExpresswayWeb.Endpoint,
# Start your own worker by calling: TurnJunebugExpressway.Worker.start_link(arg1, arg2, arg3)
Expand Down
46 changes: 46 additions & 0 deletions lib/turn_junebug_expressway/message_recipient_id_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule TurnJunebugExpressway.MessageRecipientIdCache do
use GenServer

def start_link(options \\ []) do
{name, options} = Keyword.pop(options, :name, __MODULE__)
GenServer.start_link(__MODULE__, options, name: name)
end

def put(
pid,
key,
value,
ttl \\ get_env(:agent, :ttl) |> String.to_integer()
) do
GenServer.call(pid, {:put, key, value, ttl})
end

def get(pid, key) do
GenServer.call(pid, {:get, key})
end

# GenServer callbacks

def init(_) do
state = %{}
{:ok, state}
end

# Setting the same key multiple times, does not extend its lifetime---each call schedules a new expiry
def handle_call({:put, key, value, ttl}, _from, state) do
Process.send_after(self(), {:expire, key}, ttl)
{:reply, :ok, Map.put(state, key, value)}
end

def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end

def handle_info({:expire, key}, state) do
{:noreply, Map.delete(state, key)}
end

def get_env(section, key) do
Application.get_env(:turn_junebug_expressway, section)[key]
end
end
48 changes: 35 additions & 13 deletions lib/turn_junebug_expressway_web/utils.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule TurnJunebugExpresswayWeb.Utils do
use Tesla

alias TurnJunebugExpressway.MessageRecipientIdCache
@turn_client Application.compile_env(:turn_junebug_expressway, :turn_client)
@rapidpro_client Application.compile_env(:turn_junebug_expressway, :rapidpro_client)

Expand Down Expand Up @@ -61,7 +61,21 @@ defmodule TurnJunebugExpresswayWeb.Utils do
:ok
end

def send_message(message, ttl) do
key = Map.get(message, "user_message_id")
value = Map.get(message, "recipient_id")
# IO.puts("#{message}")
# IO.puts("#{inspect(key)}, #{inspect(value)}")
TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value, ttl)
TurnJunebugExpressway.MessageEngine.publish_message(message)
end

def send_message(message) do
key = Map.get(message, "user_message_id")
value = Map.get(message, "recipient_id")
# IO.puts("#{message}")
# IO.puts("#{inspect(key)}, #{inspect(value)}")
TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value)
TurnJunebugExpressway.MessageEngine.publish_message(message)
end

Expand Down Expand Up @@ -113,24 +127,32 @@ defmodule TurnJunebugExpresswayWeb.Utils do
end

def forward_event(event) do
IO.puts("FORWARD_EVENT: #{inspect(event)}")
# IO.puts("#{inspect(event)}")
IO.puts(
"#{inspect(MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")))}"
)

case event |> get_event_status do
{:ignore, _} ->
:ok

{:ok, status} ->
@turn_client.client()
|> @turn_client.post_event(%{
"statuses" => [
%{
"id" => Map.get(event, "user_message_id"),
"recipient_id" => nil,
"status" => status,
"timestamp" => get_event_timestamp(event, :second)
}
]
})
if recipient_id =
MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")) != nil do
@turn_client.client()
|> @turn_client.post_event(%{
"statuses" => [
%{
"id" => Map.get(event, "user_message_id"),
# "recipient_id" => nil,
"recipient_id" =>
MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")),
"status" => status,
"timestamp" => get_event_timestamp(event, :second)
}
]
})
end
end
end

Expand Down
81 changes: 79 additions & 2 deletions test/turn_junebug_expressway_web/utils_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
import Mox

alias TurnJunebugExpresswayWeb.Utils
alias TurnJunebugExpressway.MessageRecipientIdCache

describe "format_urn" do
test "format_urn/1 with + for turn" do
Expand Down Expand Up @@ -34,18 +35,58 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end

describe "handle_incoming_event" do
test "sends event back to turn, recipient_id not found", %{} do
TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, _ -> raise "Shouldnt be called" end)

message = %{
"content" => "something",
"recipient_id" => nil,
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)

event = %{
"transport_name" => "d49d3569-47d5-47a0-8074-5a7ffa684832",
"event_type" => "ack",
"event_id" => "b3db4f670d4c4e2297c58a6dc5b72980",
"sent_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
"helper_metadata" => %{},
"routing_metadata" => %{},
"message_version" => "20110921",
"timestamp" => "2019-10-31 12:32:24.930687",
"transport_metadata" => %{},
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
"message_type" => "event"
}

# Utils.handle_incoming_event(Jason.encode!(event))

assert Utils.handle_incoming_event(Jason.encode!(event)) == nil
end

test "sends event back to turn", %{} do
body = %{
"statuses" => [
%{
"id" => "f74c4e6108d8418ab53dbcfd628242f3",
"recipient_id" => nil,
"recipient_id" => "1234",
"status" => "sent",
"timestamp" => "1572525144"
}
]
}

message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)

TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, ^body -> :ok end)
Expand All @@ -72,13 +113,21 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
"statuses" => [
%{
"id" => "f74c4e6108d8418ab53dbcfd628242f3",
"recipient_id" => nil,
"recipient_id" => 1234,
"status" => "sent",
"timestamp" => "1572525144930"
}
]
}

message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "16e42b66-03b7-4558-8a72-e9db481fdb4c"
}

Utils.send_message(message)

TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, _new_body ->
Expand Down Expand Up @@ -163,6 +212,34 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end
end

describe "ttl" do
test "checking if key is deleated after 3 seconds" do
message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message, 1000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234"
:timer.sleep(3_000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil
end

test "checking if key is deleated after default ttl" do
message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234"
:timer.sleep(11_000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil
end
end

describe "queue_stuck?" do
test "true if rate is 0 and there is messages", %{} do
assert Utils.queue_stuck?(0, 1) == true
Expand Down
Loading