Skip to content

Commit

Permalink
logging supervisors starting and replication slot status
Browse files Browse the repository at this point in the history
  • Loading branch information
DohanKim committed Feb 15, 2024
1 parent 37963e0 commit 0271cbe
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/walex/config/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ defmodule WalEx.Config.Registry do

@walex_registry :walex_registry

require Logger

def start_registry do
Logger.info("Starting Registry")

case Process.whereis(@walex_registry) do
nil ->
Logger.info("No registry found, starting new one")
Registry.start_link(keys: :unique, name: @walex_registry)

pid ->
Logger.info("Registry already running with pid: #{inspect(pid)}")
{:ok, pid}
end
end
Expand Down
15 changes: 15 additions & 0 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ defmodule WalEx.Replication.Server do
alias WalEx.Decoder
alias WalEx.Replication.Publisher

require Logger

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
opts = set_pgx_replication_conn_opts(app_name)

Logger.info("Starting WalEx Replication Server for #{app_name}")

Postgrex.ReplicationConnection.start_link(__MODULE__, [app_name: app_name], opts)
end

Expand Down Expand Up @@ -43,6 +47,8 @@ defmodule WalEx.Replication.Server do

query = "CREATE_REPLICATION_SLOT #{temp_slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"

Logger.info("Creating replication slot: #{temp_slot}")

{:query, query, %{state | step: :create_slot}}
end

Expand All @@ -58,9 +64,18 @@ defmodule WalEx.Replication.Server do
query =
"START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"

Logger.info("Starting replication slot: #{slot_name}, publication: #{publication}")

{:stream, query, [], %{state | step: :streaming}}
end

@impl true
def handle_disconnect(state) do
Logger.info("Disconnected from replication server, state: #{inspect(state, pretty: true)}")

{:noreply, state}
end

@impl true
# https://www.postgresql.org/docs/14/protocol-replication.html
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
Expand Down
4 changes: 4 additions & 0 deletions lib/walex/replication/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ defmodule WalEx.Replication.Supervisor do

alias WalEx.Replication.{Publisher, Server}

require Logger

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
name = WalEx.Config.Registry.set_name(:set_supervisor, __MODULE__, app_name)

Logger.info("Starting WalEx Replication Supervisor for #{app_name}")

Supervisor.start_link(__MODULE__, opts, name: name)
end

Expand Down
6 changes: 6 additions & 0 deletions lib/walex/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ defmodule WalEx.Supervisor do
alias WalEx.Replication.Supervisor, as: ReplicationSupervisor
alias WalExConfig.Registry, as: WalExRegistry

require Logger

def start_link(opts) do
app_name = Keyword.get(opts, :name)
module_names = build_module_names(app_name, opts)
supervisor_opts = Keyword.put(opts, :modules, module_names)

Logger.info(
"Starting WalEx Supervisor for #{app_name}, with opts: #{inspect(supervisor_opts, pretty: true)}"
)

validate_opts(supervisor_opts)

{:ok, _pid} = WalExRegistry.start_registry()
Expand Down

0 comments on commit 0271cbe

Please sign in to comment.