Skip to content

Commit

Permalink
Merge pull request #35 from praekeltfoundation/recipient_id_store
Browse files Browse the repository at this point in the history
Adding agent for caching recipient ids and extra tests for these addi…
  • Loading branch information
MatthewWeppenaar authored Aug 8, 2024
2 parents 7f78ff6 + 10fbc02 commit e8d734d
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 15 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# is restricted to this project.
import Config

config :turn_junebug_expressway, :agent, ttl: System.get_env("RECIPIENT_ID_TTL", "10000")

# Configures the endpoint
config :turn_junebug_expressway, TurnJunebugExpresswayWeb.Endpoint,
url: [host: "localhost"],
Expand Down
2 changes: 2 additions & 0 deletions lib/turn_junebug_expressway/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule TurnJunebugExpressway.Application do

# Define workers and child supervisors to be supervised
children = [
# Start Agent
{TurnJunebugExpressway.MessageRecipientIdCache, name: :my_cache},
# Start the endpoint when the application starts
TurnJunebugExpresswayWeb.Endpoint,
# Start your own worker by calling: TurnJunebugExpressway.Worker.start_link(arg1, arg2, arg3)
Expand Down
46 changes: 46 additions & 0 deletions lib/turn_junebug_expressway/message_recipient_id_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule TurnJunebugExpressway.MessageRecipientIdCache do
use GenServer

def start_link(options \\ []) do
{name, options} = Keyword.pop(options, :name, __MODULE__)
GenServer.start_link(__MODULE__, options, name: name)
end

def put(
pid,
key,
value,
ttl \\ get_env(:agent, :ttl) |> String.to_integer()
) do
GenServer.call(pid, {:put, key, value, ttl})
end

def get(pid, key) do
GenServer.call(pid, {:get, key})
end

# GenServer callbacks

def init(_) do
state = %{}
{:ok, state}
end

# Setting the same key multiple times, does not extend its lifetime---each call schedules a new expiry
def handle_call({:put, key, value, ttl}, _from, state) do
Process.send_after(self(), {:expire, key}, ttl)
{:reply, :ok, Map.put(state, key, value)}
end

def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end

def handle_info({:expire, key}, state) do
{:noreply, Map.delete(state, key)}
end

def get_env(section, key) do
Application.get_env(:turn_junebug_expressway, section)[key]
end
end
48 changes: 35 additions & 13 deletions lib/turn_junebug_expressway_web/utils.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule TurnJunebugExpresswayWeb.Utils do
use Tesla

alias TurnJunebugExpressway.MessageRecipientIdCache
@turn_client Application.compile_env(:turn_junebug_expressway, :turn_client)
@rapidpro_client Application.compile_env(:turn_junebug_expressway, :rapidpro_client)

Expand Down Expand Up @@ -61,7 +61,21 @@ defmodule TurnJunebugExpresswayWeb.Utils do
:ok
end

def send_message(message, ttl) do
key = Map.get(message, "user_message_id")
value = Map.get(message, "recipient_id")
# IO.puts("#{message}")
# IO.puts("#{inspect(key)}, #{inspect(value)}")
TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value, ttl)
TurnJunebugExpressway.MessageEngine.publish_message(message)
end

def send_message(message) do
key = Map.get(message, "user_message_id")
value = Map.get(message, "recipient_id")
# IO.puts("#{message}")
# IO.puts("#{inspect(key)}, #{inspect(value)}")
TurnJunebugExpressway.MessageRecipientIdCache.put(:my_cache, key, value)
TurnJunebugExpressway.MessageEngine.publish_message(message)
end

Expand Down Expand Up @@ -113,24 +127,32 @@ defmodule TurnJunebugExpresswayWeb.Utils do
end

def forward_event(event) do
IO.puts("FORWARD_EVENT: #{inspect(event)}")
# IO.puts("#{inspect(event)}")
IO.puts(
"#{inspect(MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")))}"
)

case event |> get_event_status do
{:ignore, _} ->
:ok

{:ok, status} ->
@turn_client.client()
|> @turn_client.post_event(%{
"statuses" => [
%{
"id" => Map.get(event, "user_message_id"),
"recipient_id" => nil,
"status" => status,
"timestamp" => get_event_timestamp(event, :second)
}
]
})
if recipient_id =
MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")) != nil do
@turn_client.client()
|> @turn_client.post_event(%{
"statuses" => [
%{
"id" => Map.get(event, "user_message_id"),
# "recipient_id" => nil,
"recipient_id" =>
MessageRecipientIdCache.get(:my_cache, Map.get(event, "user_message_id")),
"status" => status,
"timestamp" => get_event_timestamp(event, :second)
}
]
})
end
end
end

Expand Down
81 changes: 79 additions & 2 deletions test/turn_junebug_expressway_web/utils_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
import Mox

alias TurnJunebugExpresswayWeb.Utils
alias TurnJunebugExpressway.MessageRecipientIdCache

describe "format_urn" do
test "format_urn/1 with + for turn" do
Expand Down Expand Up @@ -34,18 +35,58 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end

describe "handle_incoming_event" do
test "sends event back to turn, recipient_id not found", %{} do
TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, _ -> raise "Shouldnt be called" end)

message = %{
"content" => "something",
"recipient_id" => nil,
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)

event = %{
"transport_name" => "d49d3569-47d5-47a0-8074-5a7ffa684832",
"event_type" => "ack",
"event_id" => "b3db4f670d4c4e2297c58a6dc5b72980",
"sent_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
"helper_metadata" => %{},
"routing_metadata" => %{},
"message_version" => "20110921",
"timestamp" => "2019-10-31 12:32:24.930687",
"transport_metadata" => %{},
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
"message_type" => "event"
}

# Utils.handle_incoming_event(Jason.encode!(event))

assert Utils.handle_incoming_event(Jason.encode!(event)) == nil
end

test "sends event back to turn", %{} do
body = %{
"statuses" => [
%{
"id" => "f74c4e6108d8418ab53dbcfd628242f3",
"recipient_id" => nil,
"recipient_id" => "1234",
"status" => "sent",
"timestamp" => "1572525144"
}
]
}

message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)

TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, ^body -> :ok end)
Expand All @@ -72,13 +113,21 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
"statuses" => [
%{
"id" => "f74c4e6108d8418ab53dbcfd628242f3",
"recipient_id" => nil,
"recipient_id" => 1234,
"status" => "sent",
"timestamp" => "1572525144930"
}
]
}

message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "16e42b66-03b7-4558-8a72-e9db481fdb4c"
}

Utils.send_message(message)

TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
|> expect(:post_event, fn :client, _new_body ->
Expand Down Expand Up @@ -163,6 +212,34 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end
end

describe "ttl" do
test "checking if key is deleated after 3 seconds" do
message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message, 1000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234"
:timer.sleep(3_000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil
end

test "checking if key is deleated after default ttl" do
message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == "1234"
:timer.sleep(11_000)
assert MessageRecipientIdCache.get(:my_cache, Map.get(message, "user_message_id")) == nil
end
end

describe "queue_stuck?" do
test "true if rate is 0 and there is messages", %{} do
assert Utils.queue_stuck?(0, 1) == true
Expand Down

0 comments on commit e8d734d

Please sign in to comment.