Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config: Move modules under destinations. #40

Merged
merged 3 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions lib/walex/config/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ defmodule WalEx.Config do
|> Keyword.take(keys)
end

def get_database(app_name), do: get_configs(app_name, :database)

def get_destination(app_name, destination) do
case get_configs(app_name, :destinations) do
destinations when is_list(destinations) and destinations != [] ->
destinations
|> Keyword.get(destination, nil)

_ ->
nil
end
end

def get_event_modules(app_name), do: get_destination(app_name, :modules)

def get_webhooks(app_name), do: get_destination(app_name, :webhooks)

def get_event_relay_topic(app_name), do: get_destination(app_name, :event_relay_topic)

def add_config(app_name, key, new_values)
when is_list(new_values) and key in @allowed_config_values do
Agent.update(set_agent(app_name), fn config ->
Expand Down Expand Up @@ -86,23 +105,24 @@ defmodule WalEx.Config do

name = Keyword.get(configs, :name)
subscriptions = Keyword.get(configs, :subscriptions, [])
modules = Keyword.get(configs, :modules, [])
destinations = Keyword.get(configs, :destinations, [])
modules = Keyword.get(destinations, :modules, [])
module_names = build_module_names(name, modules, subscriptions)

[
name: name,
publication: Keyword.get(configs, :publication),
subscriptions: subscriptions,
modules: build_module_names(name, modules, subscriptions),
destinations: Keyword.get(configs, :destinations),
webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret),
event_relay: Keyword.get(configs, :event_relay),
hostname: Keyword.get(configs, :hostname, db_configs_from_url[:hostname]),
username: Keyword.get(configs, :username, db_configs_from_url[:username]),
password: Keyword.get(configs, :password, db_configs_from_url[:password]),
port: Keyword.get(configs, :port, db_configs_from_url[:port]),
database: Keyword.get(configs, :database, db_configs_from_url[:database]),
ssl: Keyword.get(configs, :ssl, false),
ssl_opts: Keyword.get(configs, :ssl_opts, verify: :verify_none)
ssl_opts: Keyword.get(configs, :ssl_opts, verify: :verify_none),
subscriptions: subscriptions,
publication: Keyword.get(configs, :publication),
destinations: Keyword.put(destinations, :modules, module_names),
webhook_signing_secret: Keyword.get(configs, :webhook_signing_secret),
event_relay: Keyword.get(configs, :event_relay)
]
end

Expand All @@ -112,6 +132,7 @@ defmodule WalEx.Config do
|> map_subscriptions_to_modules(name)
|> Enum.concat(modules)
|> Enum.uniq()
|> map_existing_modules()
|> Enum.sort()
end

Expand All @@ -127,7 +148,6 @@ defmodule WalEx.Config do
def to_module_name(module_name) when is_atom(module_name) or is_binary(module_name) do
module_name
|> to_string()
|> String.replace("Elixir.", "")
|> String.split(["_"])
|> Enum.map_join(&capitalize/1)
end
Expand All @@ -140,6 +160,18 @@ defmodule WalEx.Config do
end
end

defp module_exists?(module_name) do
case Code.ensure_compiled(module_name) do
{:module, _module} ->
true

{:error, _error} ->
false
end
end

defp map_existing_modules(modules), do: Enum.filter(modules, &module_exists?/1)

defp parse_url(""), do: []

defp parse_url(url) when is_binary(url) do
Expand Down
6 changes: 3 additions & 3 deletions lib/walex/destinations/destinations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule WalEx.Destinations do

use GenServer

alias WalEx.{Destinations, Config, Event, Helpers, TransactionFilter}
alias WalEx.{Destinations, Config, Event, TransactionFilter}
alias Config.Registry
alias Destinations.{EventModules, EventRelay, Webhooks}

Expand Down Expand Up @@ -65,7 +65,7 @@ defmodule WalEx.Destinations do
defp process_event_relay([], _app_name), do: :ok

defp process_event_relay(filtered_events, app_name) do
event_relay_topic = Helpers.get_event_relay_topic(app_name)
event_relay_topic = Config.get_event_relay_topic(app_name)

if is_binary(event_relay_topic) and event_relay_topic != "" do
EventRelay.process(filtered_events, app_name)
Expand All @@ -75,7 +75,7 @@ defmodule WalEx.Destinations do
defp process_webhooks([], _app_name), do: :ok

defp process_webhooks(filtered_events, app_name) do
webhooks = Helpers.get_webhooks(app_name)
webhooks = Config.get_webhooks(app_name)

if is_list(webhooks) and webhooks != [] do
Webhooks.process(filtered_events, app_name)
Expand Down
10 changes: 2 additions & 8 deletions lib/walex/destinations/event_modules.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule WalEx.Destinations.EventModules do
@impl true
def handle_call({:process, txn, server}, _from, state) do
server
|> WalEx.Config.get_configs([:modules])
|> WalEx.Config.get_configs(:destinations)
|> process_events(txn)

{:reply, :ok, state}
Expand All @@ -52,13 +52,7 @@ defmodule WalEx.Destinations.EventModules do
end

defp process_module(module_name, functions, txn) do
case Code.ensure_compiled(module_name) do
{:module, module} ->
Enum.each(functions, &apply_process_macro(&1, module, txn))

{:error, _error} ->
:ok
end
Enum.each(functions, &apply_process_macro(&1, module_name, txn))
end

defp apply_process_macro(function, module, txn) do
Expand Down
2 changes: 1 addition & 1 deletion lib/walex/destinations/event_relay.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule WalEx.Destinations.EventRelay do
def process_events(changes, app_name) do
case connect(app_name) do
{:ok, channel} ->
topic = Helpers.get_event_relay_topic(app_name)
topic = Config.get_event_relay_topic(app_name)
events = build_events(changes)

request =
Expand Down
40 changes: 22 additions & 18 deletions lib/walex/destinations/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,44 @@ defmodule WalEx.Destinations.Supervisor do

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
name = WalEx.Config.Registry.set_name(:set_supervisor, __MODULE__, app_name)
name = Config.Registry.set_name(:set_supervisor, __MODULE__, app_name)

Supervisor.start_link(__MODULE__, configs: opts, name: name)
end

@impl true
def init(opts) do
configs = Keyword.get(opts, :configs)
app_name = Keyword.get(configs, :name)
app_name =
opts
|> Keyword.get(:configs)
|> Keyword.get(:name)

children =
[
{Destinations, app_name: app_name},
# TODO: EventModules should be dynamic (only if modules exist)
{EventModules, app_name: app_name}
]
|> maybe_webhooks(configs)
|> maybe_event_relay(configs)
[{Destinations, app_name: app_name}]
|> maybe_event_modules(app_name)
|> maybe_webhooks(app_name)
|> maybe_event_relay(app_name)

Supervisor.init(children, strategy: :one_for_all)
end

defp maybe_webhooks(children, configs) do
app_name = Keyword.get(configs, :name)
destinations = Keyword.get(configs, :destinations)
has_webhook_config = Config.has_config?(destinations, :webhooks)
defp maybe_event_modules(children, app_name) do
modules = Config.get_event_modules(app_name)
has_module_config = is_list(modules) and modules != []

maybe_set_child(children, has_module_config, {EventModules, app_name: app_name})
end

defp maybe_webhooks(children, app_name) do
webhooks = Config.get_webhooks(app_name)
has_webhook_config = is_list(webhooks) and webhooks != []

maybe_set_child(children, has_webhook_config, {Webhooks, app_name: app_name})
end

defp maybe_event_relay(children, configs) do
app_name = Keyword.get(configs, :name)
destinations = Keyword.get(configs, :destinations)
has_event_relay_config = Config.has_config?(destinations, :event_relay_topic)
defp maybe_event_relay(children, app_name) do
event_relay = Config.get_event_relay_topic(app_name)
has_event_relay_config = is_binary(event_relay) and event_relay != ""

maybe_set_child(children, has_event_relay_config, {EventRelay, app_name: app_name})
end
Expand Down
2 changes: 1 addition & 1 deletion lib/walex/destinations/webhooks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule WalEx.Destinations.Webhooks do
defp process_events(changes, app_name), do: Enum.map(changes, &process_event(&1, app_name))

defp process_event(change, app_name) do
webhooks = Helpers.get_webhooks(app_name)
webhooks = Config.get_webhooks(app_name)
signing_secret = get_signing_secret(app_name)

send_webhooks(webhooks, signing_secret, change)
Expand Down
5 changes: 2 additions & 3 deletions lib/walex/event/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule WalEx.Event do
@moduledoc """
Event DSL and casting
"""

@derive Jason.Encoder
defstruct([:name, :type, :source, :new_record, :old_record, :changes, :timestamp])

Expand All @@ -19,7 +18,7 @@ defmodule WalEx.Event do
require Logger
import WalEx.TransactionFilter

alias WalEx.{Changes, Event, Helpers}
alias WalEx.{Changes, Config, Event, Helpers}

@doc """
Macros for processing events
Expand Down Expand Up @@ -108,7 +107,7 @@ defmodule WalEx.Event do
%WalEx.Event.Source{
name: Helpers.get_source_name(),
version: Helpers.get_source_version(),
db: Helpers.get_database(app_name),
db: Config.get_database(app_name),
schema: schema,
table: table,
columns: map_columns(columns)
Expand Down
20 changes: 0 additions & 20 deletions lib/walex/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ defmodule WalEx.Helpers do
@moduledoc """
Helper functions for WalEx
"""

alias WalEx.Config

def set_type(table, :insert), do: to_string(table) <> ".insert"
def set_type(table, :update), do: to_string(table) <> ".update"
def set_type(table, :delete), do: to_string(table) <> ".delete"
Expand All @@ -14,21 +11,4 @@ defmodule WalEx.Helpers do
def get_source_name, do: "WalEx"

def get_source_version, do: Application.spec(:walex)[:vsn] |> to_string()

def get_database(app_name), do: Config.get_configs(app_name, :database)

def get_destination(app_name, destination) do
case Config.get_configs(app_name, :destinations) do
destinations when is_list(destinations) and destinations != [] ->
destinations
|> Keyword.get(destination, nil)

_ ->
nil
end
end

def get_webhooks(app_name), do: get_destination(app_name, :webhooks)

def get_event_relay_topic(app_name), do: get_destination(app_name, :event_relay_topic)
end
Loading
Loading