diff --git a/.tool-versions b/.tool-versions index 989b51f..4730f72 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,3 @@ erlang 26.1.2 -elixir 1.15.7-otp-26 \ No newline at end of file +elixir 1.15.7-otp-26 +rust 1.63.0 \ No newline at end of file diff --git a/lib/mix/tasks/walex.drop.ex b/lib/mix/tasks/walex.drop.ex index cbde96d..d18b312 100644 --- a/lib/mix/tasks/walex.drop.ex +++ b/lib/mix/tasks/walex.drop.ex @@ -2,9 +2,7 @@ defmodule Mix.Tasks.Walex.Drop do @moduledoc """ Drops the database """ - use Mix.Task - alias Mix.Tasks.Walex.Helpers @test_database "todos_test" diff --git a/lib/mix/tasks/walex.setup.ex b/lib/mix/tasks/walex.setup.ex index 7ceea92..3b0b2f0 100644 --- a/lib/mix/tasks/walex.setup.ex +++ b/lib/mix/tasks/walex.setup.ex @@ -2,12 +2,16 @@ defmodule Mix.Tasks.Walex.Setup do @moduledoc """ Creates, migrates and seeds the database """ - use Mix.Task - alias Mix.Tasks.Walex.Helpers @test_database "todos_test" + @base_configs [ + hostname: "localhost", + username: "postgres", + password: "postgres", + database: @test_database + ] @shortdoc "Set up test database and tables" def run(_) do @@ -17,14 +21,7 @@ defmodule Mix.Tasks.Walex.Setup do defp setup_test_database do Helpers.create_database(@test_database) - - {:ok, pid} = - Postgrex.start_link( - hostname: "localhost", - username: "postgres", - password: "postgres", - database: @test_database - ) + {:ok, pid} = Postgrex.start_link(@base_configs) create_database_logic(pid) create_database_tables(pid) diff --git a/lib/walex/config/config.ex b/lib/walex/config/config.ex index 6b3e6e9..3cb3ba3 100644 --- a/lib/walex/config/config.ex +++ b/lib/walex/config/config.ex @@ -2,7 +2,6 @@ defmodule WalEx.Config do @moduledoc """ Configuration """ - use Agent alias WalEx.Config.Registry, as: WalExRegistry diff --git a/lib/walex/replication/server.ex b/lib/walex/replication/server.ex index cb8cf69..1a13f04 100644 --- a/lib/walex/replication/server.ex +++ b/lib/walex/replication/server.ex @@ -39,24 +39,21 @@ defmodule WalEx.Replication.Server do @impl true def handle_connect(state) do - temp_slot = "walex_temp_slot_" <> Integer.to_string(:rand.uniform(9_999)) - - query = "CREATE_REPLICATION_SLOT #{temp_slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;" + query = + "CREATE_REPLICATION_SLOT #{slot_name(state.app_name)} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;" {:query, query, %{state | step: :create_slot}} end @impl true - def handle_result([%Postgrex.Result{rows: rows} | _results], state = %{step: :create_slot}) do - slot_name = rows |> hd |> hd - + def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do publication = state.app_name |> WalEx.Config.get_configs([:publication]) |> Keyword.get(:publication) query = - "START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')" + "START_REPLICATION SLOT #{slot_name(state.app_name)} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')" {:stream, query, [], %{state | step: :streaming}} end @@ -83,4 +80,6 @@ defmodule WalEx.Replication.Server do @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) defp current_time, do: System.os_time(:microsecond) - @epoch + + defp slot_name(app_name), do: to_string(app_name) <> "_walex" end diff --git a/mix.exs b/mix.exs index 23b36ae..82b98b1 100644 --- a/mix.exs +++ b/mix.exs @@ -15,7 +15,8 @@ defmodule WalEx.MixProject do name: "WalEx", source_url: "https://github.com/cpursley/walex", test_coverage: [tool: ExCoveralls], - elixirc_paths: elixirc_paths(Mix.env()) + elixirc_paths: elixirc_paths(Mix.env()), + compilers: compilers() ] end @@ -43,7 +44,8 @@ defmodule WalEx.MixProject do {:ex_doc, "~> 0.31.1", only: :dev, runtime: false}, {:sobelow, "~> 0.12", only: [:dev, :test], runtime: false}, {:credo, "~> 1.7.3", only: [:dev, :test], runtime: false}, - {:excoveralls, "~> 0.10", only: [:dev, :test], runtime: false} + {:excoveralls, "~> 0.10", only: [:dev, :test], runtime: false}, + {:rambo, "~> 0.3.4", only: [:dev, :test], runtime: false} ] end @@ -76,4 +78,12 @@ defmodule WalEx.MixProject do defp elixirc_paths(:test), do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] + + defp compilers do + unless Mix.env() == :prod do + Mix.compilers() ++ [:rambo] + else + Mix.compilers() + end + end end diff --git a/mix.lock b/mix.lock index 2262a16..75f8659 100644 --- a/mix.lock +++ b/mix.lock @@ -37,6 +37,7 @@ "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, + "rambo": {:hex, :rambo, "0.3.4", "8962ac3bd1a633ee9d0e8b44373c7913e3ce3d875b4151dcd060886092d2dce7", [:mix], [], "hexpm", "0cc54ed089fbbc84b65f4b8a774224ebfe60e5c80186fafc7910b3e379ad58f1"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "req": {:hex, :req, "0.4.8", "2b754a3925ddbf4ad78c56f30208ced6aefe111a7ea07fb56c23dccc13eb87ae", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "7146e51d52593bb7f20d00b5308a5d7d17d663d6e85cd071452b613a8277100c"}, "retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"}, diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 3ac6c8d..9e8a275 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -1,5 +1,11 @@ defmodule WalEx.Support.TestHelpers do - def find_worker_pid(supervisor_pid, child_module) do + require Logger + + def tap_debug(to_tap, label) do + to_tap |> tap(&Logger.debug(label <> inspect(&1))) + end + + def find_child_pid(supervisor_pid, child_module) do supervisor_pid |> Supervisor.which_children() |> find_pid(child_module) @@ -10,6 +16,59 @@ defmodule WalEx.Support.TestHelpers do pid end + def get_database_pid(supervisor_pid) do + find_child_pid(supervisor_pid, DBConnection.ConnectionPool) + end + + def start_database(configs) do + case Postgrex.start_link(configs) do + {:ok, conn} -> + Logger.info("Database is running.") + + {:ok, conn} + + {:error, {:already_started, conn}} -> + Logger.info("Database is already running.") + + {:ok, conn} + + {:error, reason} -> + Logger.error("Error connecting to the database. Reason: #{inspect(reason)}") + + {:error, reason} + end + end + + def terminate_database_connection(database_pid, username) do + query = + "SELECT pg_terminate_backend(pg_backend_pid()) FROM pg_stat_activity WHERE usename = $1" + + Postgrex.query(database_pid, query, [username]) + end + + def wait_for_restart do + Logger.debug("waiting") + :timer.sleep(3000) + Logger.debug("done waiting") + end + + def query(pid, query) do + pid + |> Postgrex.query!(query, []) + |> map_rows_to_columns() + end + + defp map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do + Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end) + end + + def pg_replication_slots(database_pid) do + pg_replication_slots_query = + "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";" + + query(database_pid, pg_replication_slots_query) + end + def update_user(database_pid) do update_user = """ UPDATE \"user\" SET age = 30 WHERE id = 1 diff --git a/test/walex/database_test.exs b/test/walex/database_test.exs index 4879063..e1f1dba 100644 --- a/test/walex/database_test.exs +++ b/test/walex/database_test.exs @@ -1,8 +1,10 @@ defmodule WalEx.DatabaseTest do use ExUnit.Case, async: false - + import WalEx.Support.TestHelpers alias WalEx.Supervisor, as: WalExSupervisor + require Logger + @hostname "localhost" @username "postgres" @password "postgres" @@ -16,9 +18,12 @@ defmodule WalEx.DatabaseTest do database: @database, port: 5432, subscriptions: ["user", "todo"], - publication: "events" + publication: "events", + destinations: [modules: [TestModule]] ] + @replication_slot %{"active" => true, "slot_name" => "todos_walex", "slot_type" => "logical"} + describe "logical replication" do setup do {:ok, database_pid} = start_database() @@ -27,28 +32,297 @@ defmodule WalEx.DatabaseTest do end test "should have logical replication set up", %{database_pid: pid} do - show_wall_level = "SHOW wal_level;" - assert is_pid(pid) - assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level) + assert [%{"wal_level" => "logical"}] == query(pid, "SHOW wal_level;") end test "should start replication slot", %{database_pid: database_pid} do assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs) assert is_pid(replication_pid) + assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid) + end + + test "should re-initiate after forcing database process termination" do + assert {:ok, supervisor_pid} = TestSupervisor.start_link(@base_configs) + database_pid = get_database_pid(supervisor_pid) + + assert is_pid(database_pid) + assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid) - pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";" + assert Process.exit(database_pid, :kill) + |> tap_debug("Forcefully killed database connection: ") - assert [ - %{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"} - | _replication_slots - ] = query(database_pid, pg_replication_slots) + refute Process.info(database_pid) - assert String.contains?(slot_name, "walex_temp_slot") + new_database_pid = get_database_pid(supervisor_pid) + + assert is_pid(new_database_pid) + refute database_pid == new_database_pid + assert_update_user(new_database_pid) + end + + test "should re-initiate after database connection restarted by supervisor" do + assert {:ok, supervisor_pid} = TestSupervisor.start_link(@base_configs) + database_pid = get_database_pid(supervisor_pid) + + Supervisor.terminate_child(supervisor_pid, DBConnection.ConnectionPool) + |> tap_debug("Supervisor terminated database connection: ") + + assert :undefined == get_database_pid(supervisor_pid) + + refute Process.info(database_pid) + + Supervisor.restart_child(supervisor_pid, DBConnection.ConnectionPool) + |> tap_debug("Supervisor restarted database connection: ") + + wait_for_restart() + + restarted_database_pid = get_database_pid(supervisor_pid) + + assert is_pid(restarted_database_pid) + refute database_pid == restarted_database_pid + assert_update_user(restarted_database_pid) + + assert [@replication_slot | _replication_slots] = + pg_replication_slots(restarted_database_pid) + end + + test "should re-initiate after database connection terminated" do + assert {:ok, supervisor_pid} = TestSupervisor.start_link(@base_configs) + database_pid = get_database_pid(supervisor_pid) + + assert {:error, + %DBConnection.ConnectionError{ + message: "tcp recv: closed", + severity: :error, + reason: :error + }} == terminate_database_connection(database_pid, @username) + + assert_update_user(database_pid) + + assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid) + end + + test "should re-initiate after database restart" do + assert {:ok, supervisor_pid} = TestSupervisor.start_link(@base_configs) + database_pid = get_database_pid(supervisor_pid) + + assert is_pid(database_pid) + assert [@replication_slot | _replication_slots] = pg_replication_slots(database_pid) + + assert Process.exit(database_pid, :kill) + |> tap_debug("Forcefully killed database connection: ") + + assert :ok == pg_restart() + + new_database_pid = get_database_pid(supervisor_pid) + + assert is_pid(new_database_pid) + refute database_pid == new_database_pid + assert_update_user(new_database_pid) + end + end + + @linux_path "/usr/lib/postgresql" + @mac_homebrew_path "/usr/local/Cellar/postgresql" + @mac_apple_silicon_homebrew_path "/opt/homebrew/Cellar/postgresql" + @mac_app_path "/Applications/Postgres.app/Contents/Versions" + + def pg_restart do + case :os.type() do + {:unix, :darwin} -> + Logger.debug("MacOS detected.") + + restart_postgres() + + {:unix, :linux} -> + Logger.debug("Linux detected.") + + restart_postgres() + + other -> + Logger.debug("Unsupported operating system: #{inspect(other)}") + :ok + end + end + + defp pg_installation_type do + cond do + File.exists?(@linux_path) -> + :linux + + File.exists?(@mac_homebrew_path) -> + :mac_homebrew + + File.exists?(@mac_apple_silicon_homebrew_path) -> + :mac_apple_silicon_homebrew + + File.exists?(@mac_app_path) -> + :mac_app end end - def start_database do + defp pg_ctl_path do + case pg_installation_type() do + :linux -> + Logger.debug("PostgreSQL installed via Linux.") + @linux_path + + :mac_homebrew -> + Logger.debug("PostgreSQL installed via homebrew.") + @mac_homebrew_path + + :mac_apple_silicon_homebrew -> + Logger.debug("PostgreSQL installed via apple silicon homebrew.") + @mac_apple_silicon_homebrew_path + + :mac_app -> + Logger.debug("PostgreSQL installed via Postgres.app.") + @mac_app_path + + true -> + raise "PostgreSQL not installed via Postgres.app or homebrew." + end + end + + defp pg_data_directory(version) do + postgres_data_directory = + case pg_installation_type() do + :linux -> + "/var/lib/postgresql/#{version}/main/" + + :mac_homebrew -> + "/usr/local/var/postgres-#{version}" + + :mac_apple_silicon_homebrew -> + "/opt/homebrew/var/postgresql" + + :mac_app -> + System.user_home!() <> "/Library/Application\ Support/Postgres/var-#{version}" + end + + if File.exists?(postgres_data_directory) do + postgres_data_directory + else + raise "pg data directory not found. Make sure PostgreSQL is installed correctly." + end + end + + defp pg_bin_path(postgres_path, version) do + postgres_bin_path = Path.join([postgres_path, version, "bin", "pg_ctl"]) + + if File.exists?(postgres_bin_path) do + postgres_bin_path + else + raise "pg_ctl binary not found. Make sure PostgreSQL is installed correctly." + end + end + + defp restart_postgres do + postgres_path = pg_ctl_path() + version = pg_version(postgres_path) + postgres_bin_path = pg_bin_path(postgres_path, version) + data_directory = pg_data_directory(version) + + case pg_stop(postgres_bin_path, data_directory) do + {:ok, + %Rambo{ + status: 0, + out: _message, + err: "" + }} -> + unless pg_isready?() do + pg_start(postgres_bin_path, data_directory) + Logger.debug("Waiting after pg_start") + :timer.sleep(4000) + pg_isready?() + end + + {:error, + %Rambo{ + status: 1, + out: "", + err: error + }} -> + Logger.debug("PostgreSQL not stopped: " <> inspect(error)) + + _ -> + Logger.debug("PostgreSQL not stopped.") + end + + :ok + end + + defp pg_stop(postgres_bin_path, data_directory) do + Logger.debug("PostgreSQL stopping.") + + case pg_installation_type() do + :mac_apple_silicon_homebrew -> + Rambo.run("brew", ["services", "stop", "postgresql"]) + + _ -> + Rambo.run(postgres_bin_path, [ + "stop", + "-m", + "immediate", + "-D", + "#{data_directory}" + ]) + end + end + + defp pg_start(postgres_bin_path, data_directory) do + Logger.debug("PostgreSQL starting.") + + case pg_installation_type() do + :mac_apple_silicon_homebrew -> + Rambo.run("brew", ["services", "start", "postgresql"]) + + _ -> + # For some reason starting pg hangs so we run in async as not to block... + Task.async(fn -> + Rambo.run( + postgres_bin_path, + [ + "start", + "-D", + "#{data_directory}" + ] + ) + end) + end + end + + defp pg_isready? do + case Rambo.run("pg_isready", ["-h", "localhost", "-p", "5432"]) do + {:ok, %Rambo{status: 0, out: "localhost:5432 - accepting connections\n", err: ""}} -> + Logger.debug("PostgreSQL is running.") + true + + {:error, %Rambo{status: 2, out: "localhost:5432 - no response\n", err: ""}} -> + Logger.debug("PostgreSQL is not running.") + false + + error -> + Logger.debug("PostgreSQL is not running: " <> inspect(error)) + false + end + end + + defp pg_version(postgres_path) do + case Rambo.run("ls", [postgres_path]) do + {:ok, %Rambo{status: 0, out: versions, err: ""}} when is_binary(versions) -> + versions + |> String.split("\n") + |> Enum.filter(&String.match?(&1, ~r/^[0-9._]+$/)) + |> Enum.max() + + _error -> + raise "PostgreSQL version not found." + end + end + + defp start_database do Postgrex.start_link( hostname: @hostname, username: @username, @@ -57,15 +331,43 @@ defmodule WalEx.DatabaseTest do ) end - def query(pid, query) do - pid - |> Postgrex.query!(query, []) - |> map_rows_to_columns() + defp assert_update_user(database_pid) do + capture_log = + ExUnit.CaptureLog.capture_log(fn -> + update_user(database_pid) + + :timer.sleep(1000) + end) + + assert capture_log =~ "on_update event occurred" + assert capture_log =~ "%WalEx.Event" + end +end + +defmodule TestSupervisor do + use Supervisor + + def start_link(configs) do + Supervisor.start_link(__MODULE__, configs, name: __MODULE__) end - def map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do - Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end) + def init(configs) do + children = [ + {Postgrex, configs}, + {WalEx.Supervisor, configs} + ] + + Supervisor.init(children, strategy: :one_for_one) end +end + +defmodule TestModule do + require Logger + use WalEx.Event, name: :todos - def map_rows_to_columns(_result), do: [] + on_update( + :user, + [], + fn events -> Logger.info("on_update event occurred: #{inspect(events, pretty: true)}") end + ) end diff --git a/test/walex/event/event_dsl_test.exs b/test/walex/event/event_dsl_test.exs index 5b185f0..417e0ab 100644 --- a/test/walex/event/event_dsl_test.exs +++ b/test/walex/event/event_dsl_test.exs @@ -26,7 +26,7 @@ defmodule WalEx.EventDslTest do describe "on_event/2" do setup do - {:ok, database_pid} = start_database() + {:ok, database_pid} = start_database(@dsl_base_configs) {:ok, supervisor_pid} = WalExSupervisor.start_link(@dsl_base_configs) %{database_pid: database_pid, supervisor_pid: supervisor_pid} @@ -36,12 +36,12 @@ defmodule WalEx.EventDslTest do supervisor_pid: supervisor_pid, database_pid: database_pid } do - destinations_supervisor_pid = find_worker_pid(supervisor_pid, DestinationsSupervisor) + destinations_supervisor_pid = find_child_pid(supervisor_pid, DestinationsSupervisor) assert is_pid(destinations_supervisor_pid) events_pid = - find_worker_pid(destinations_supervisor_pid, DestinationsEventModules) + find_child_pid(destinations_supervisor_pid, DestinationsEventModules) assert is_pid(events_pid) @@ -59,7 +59,7 @@ defmodule WalEx.EventDslTest do describe "on_update/4" do setup do - {:ok, database_pid} = start_database() + {:ok, database_pid} = start_database(@dsl_base_configs) {:ok, supervisor_pid} = WalExSupervisor.start_link(@dsl_base_configs) %{database_pid: database_pid, supervisor_pid: supervisor_pid} @@ -69,12 +69,12 @@ defmodule WalEx.EventDslTest do supervisor_pid: supervisor_pid, database_pid: database_pid } do - destinations_supervisor_pid = find_worker_pid(supervisor_pid, DestinationsSupervisor) + destinations_supervisor_pid = find_child_pid(supervisor_pid, DestinationsSupervisor) assert is_pid(destinations_supervisor_pid) events_pid = - find_worker_pid(destinations_supervisor_pid, DestinationsEventModules) + find_child_pid(destinations_supervisor_pid, DestinationsEventModules) assert is_pid(events_pid) @@ -89,15 +89,6 @@ defmodule WalEx.EventDslTest do assert capture_log =~ "%WalEx.Event" end end - - defp start_database do - Postgrex.start_link( - hostname: @hostname, - username: @username, - password: @password, - database: @database - ) - end end defmodule TestApp.DslTestModule do diff --git a/test/walex/event/event_test.exs b/test/walex/event/event_test.exs index ad24187..e4a1dd4 100644 --- a/test/walex/event/event_test.exs +++ b/test/walex/event/event_test.exs @@ -28,7 +28,7 @@ defmodule WalEx.EventTest do describe "process_all/1" do setup do - {:ok, database_pid} = start_database() + {:ok, database_pid} = start_database(@base_configs) {:ok, supervisor_pid} = WalExSupervisor.start_link(@base_configs) %{database_pid: database_pid, supervisor_pid: supervisor_pid} @@ -38,12 +38,12 @@ defmodule WalEx.EventTest do database_pid: database_pid, supervisor_pid: supervisor_pid } do - destinations_supervisor_pid = find_worker_pid(supervisor_pid, DestinationsSupervisor) + destinations_supervisor_pid = find_child_pid(supervisor_pid, DestinationsSupervisor) assert is_pid(destinations_supervisor_pid) events_pid = - find_worker_pid(destinations_supervisor_pid, DestinationsEventModules) + find_child_pid(destinations_supervisor_pid, DestinationsEventModules) assert is_pid(events_pid) @@ -89,20 +89,20 @@ defmodule WalEx.EventTest do database_pid: database_pid, supervisor_pid: supervisor_pid } do - destinations_supervisor_pid = find_worker_pid(supervisor_pid, DestinationsSupervisor) + destinations_supervisor_pid = find_child_pid(supervisor_pid, DestinationsSupervisor) assert is_pid(destinations_supervisor_pid) - events_pid = find_worker_pid(destinations_supervisor_pid, DestinationsEventModules) + events_pid = find_child_pid(destinations_supervisor_pid, DestinationsEventModules) assert is_pid(events_pid) - replication_supervisor_pid = find_worker_pid(supervisor_pid, ReplicationSupervisor) + replication_supervisor_pid = find_child_pid(supervisor_pid, ReplicationSupervisor) assert is_pid(replication_supervisor_pid) replication_publisher_pid = - find_worker_pid(replication_supervisor_pid, ReplicationPublisher) + find_child_pid(replication_supervisor_pid, ReplicationPublisher) assert is_pid(replication_publisher_pid) @@ -123,27 +123,18 @@ defmodule WalEx.EventTest do :timer.sleep(1000) new_events_pid = - find_worker_pid(destinations_supervisor_pid, DestinationsEventModules) + find_child_pid(destinations_supervisor_pid, DestinationsEventModules) assert is_pid(new_events_pid) refute events_pid == new_events_pid new_replication_publisher_pid = - find_worker_pid(replication_supervisor_pid, ReplicationPublisher) + find_child_pid(replication_supervisor_pid, ReplicationPublisher) assert is_pid(new_replication_publisher_pid) refute replication_publisher_pid == new_replication_publisher_pid end end - - defp start_database do - Postgrex.start_link( - hostname: @hostname, - username: @username, - password: @password, - database: @database - ) - end end defmodule TestApp.TestModule do diff --git a/test/walex/supervisor_test.exs b/test/walex/supervisor_test.exs index c280b5c..762c425 100644 --- a/test/walex/supervisor_test.exs +++ b/test/walex/supervisor_test.exs @@ -25,17 +25,17 @@ defmodule WalEx.SupervisorTest do Supervisor.count_children(walex_supervisor_pid) replication_supervisor_pid = - find_worker_pid(walex_supervisor_pid, Replication.Supervisor) + find_child_pid(walex_supervisor_pid, Replication.Supervisor) assert is_pid(replication_supervisor_pid) replication_publisher_pid = - find_worker_pid(replication_supervisor_pid, Replication.Publisher) + find_child_pid(replication_supervisor_pid, Replication.Publisher) assert is_pid(replication_publisher_pid) replication_server_pid = - find_worker_pid(replication_supervisor_pid, Replication.Server) + find_child_pid(replication_supervisor_pid, Replication.Server) assert is_pid(replication_server_pid) end